From e3863e31abfdfc6dd0237f1fec93e089007daf0b Mon Sep 17 00:00:00 2001 From: root Date: Sat, 11 Oct 2025 15:22:43 +0800 Subject: [PATCH] =?UTF-8?q?add=20workflow=20=E5=B8=82=E5=9C=BA=E6=98=93API?= =?UTF-8?q?=E8=81=94=E7=B3=BB=E4=BA=BA,dev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../contact_create_list_feign.py | 2 +- .../contact_update_feign.py | 2 +- .../api_contact_events_feign.py | 249 +++++++++++++----- 3 files changed, 187 insertions(+), 66 deletions(-) diff --git a/dev/workflow/TK_Cust/yi_api_contact/获取新创建联系人信息/contact_create_list_feign.py b/dev/workflow/TK_Cust/yi_api_contact/获取新创建联系人信息/contact_create_list_feign.py index 13527a9..c8fbf79 100644 --- a/dev/workflow/TK_Cust/yi_api_contact/获取新创建联系人信息/contact_create_list_feign.py +++ b/dev/workflow/TK_Cust/yi_api_contact/获取新创建联系人信息/contact_create_list_feign.py @@ -48,7 +48,7 @@ def get_token(): def get_contact_ids(token): url = "https://open.cloud.custouch.com/platform/cdp/contact/batch" params = { - 'start': formatted2_previous_hour(72), + 'start': formatted2_previous_hour(720), 'end': formatted2_previous_hour(0), 'by':'CreatedAt' } diff --git a/dev/workflow/TK_Cust/yi_api_contact/获取更新联系人信息/contact_update_feign.py b/dev/workflow/TK_Cust/yi_api_contact/获取更新联系人信息/contact_update_feign.py index 79975c3..880369a 100644 --- a/dev/workflow/TK_Cust/yi_api_contact/获取更新联系人信息/contact_update_feign.py +++ b/dev/workflow/TK_Cust/yi_api_contact/获取更新联系人信息/contact_update_feign.py @@ -48,7 +48,7 @@ def get_token(): def get_contact_ids(token): url = "https://open.cloud.custouch.com/platform/cdp/contact/batch" params = { - 'start': formatted2_previous_hour(72), + 'start': formatted2_previous_hour(720), 'end': formatted2_previous_hour(0), 'by':'UpdatedAt', } diff --git a/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_feign.py b/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_feign.py index 5319643..2c66258 100644 --- a/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_feign.py +++ b/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_feign.py @@ -1,74 +1,195 @@ # coding: utf-8 import requests import json +import datetime as dt import psycopg2 import uuid -import datetime -import time -import hashlib -import time -#荟聚 +""" +获取指定时间段前的时间 +:param h: 时间段 +:return: 时间 +""" -#全局变量,便于参数使用的预设值 -current_date = datetime.date.today() # 获取当前日期 -previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期 -formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化 -formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化 -timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数; -sign_version = 'v2' # 签名版本号,固定值v2 -nonce = str(uuid.uuid4()) -#获取签名令牌 -def sign_data(email, open_api_token, timestamp, nonce, sign_version): - # 按照指定的格式拼接字符串 - data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}" - # 使用SHA256算法计算哈希值 - sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest() - return sha256_hash +def formatted2_previous_hour(h, format="%Y-%m-%d %H:%M:%S"): + if h == 0: + return dt.datetime.now().strftime(format) + start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h) + return start_of_previous_hour.strftime(format) -#获取鉴权token -def get_token(url): - #请求鉴权接口 - authRequest=requests.get(url) - #解析结果 - if not authRequest: #若为空时,返回空 - return - auth=json.loads(authRequest.text) - return auth -print('开始加载数据:api_contact_events:获取联系人事件') -authUrl='https://open.cloud.custouch.com/platform/cdp/token' - -print('开始请求令牌。') -#authRequest=requests.get(authUrl) -#auth=json.loads(authRequest.text) -auth = get_token(authUrl) -#循环判断auth是否为空,若为空,等待30s后重新请求 -i = 0 -while 'error' in auth and i < 60: - time.sleep(60) - auth = get_token(authUrl) - i = i + 1 -print('开始请求数据总数。') -url='https://open.cloud.custouch.com/platform/cdp/contact/event' -header={} -body={'app_secret':auth['app_secret'],'app_key':auth['app_key'],'contactId':'',} -dataReqL=requests.get(url,headers=header,params=body) -resL=json.loads(dataReqL.text) -# print(resL) -dataList=resL['data'] -total=len(dataList) -conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA", - host="172.17.0.8", port="5432") -print('数据库连接成功') -dataId=str(uuid.uuid4()) -print('临时id:'+dataId) -json_object = json.dumps(dataList) -cur=conn.cursor() -sql="update data_api.api_data set is_loaded = '1' where api_id = 'a7757b4a-7038-40ef-b11e-81a2c5e0';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')" -cur.execute(sql,[dataId,'a7757b4a-7038-40ef-b11e-81a2c5e0',json_object,total]) -conn.commit() -cur.close() -conn.close() -print('加载数据结束:api_contact_events:获取联系人事件') \ No newline at end of file +""" +获取token +:return: token +""" + + +def get_token(): + url = "https://open.cloud.custouch.com/platform/cdp/token" + token_payload = { + "grant_type": "client_credentials", + "app_key": "e9b240eb3a9848e89a96c5e7857794da", + "app_secret": "f8cb7069e7dd468888e360bf8c259fc6", + "scope": "openid", + } + headers = {"Content-Type": "application/x-www-form-urlencoded"} + response = requests.request("POST", url, headers=headers, data=token_payload) + if response.status_code != 200: + raise Exception("获取token失败") + + res = json.loads(response.text) + return res["access_token"] + + +""" +获取API数据 +:param token: token +:return: contact_ids +""" + + +def fetch_data(token, index=1, pageSize=2000): + url = "https://open.cloud.custouch.com/platform/cdp/contact/event" + params = { + "connectIds": [], + "eventMeta": [ + {"id": 200001}, + {"id": 200003}, + {"id": 200004}, + {"id": 201003}, + {"id": 204001}, + {"id": 204003}, + {"id": 204004}, + {"id": 206001}, + {"id": 209002}, + {"id": 214001}, + ], + "desc": True, + "from": formatted2_previous_hour(720, "%Y-%m-%dT%H:%M:%SZ"), + "index": index, + "size": pageSize + } + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + print(params) + response = requests.request("POST", url, headers=headers, data=json.dumps(params)) + if response.status_code != 200: + print(f"获取新建联系人ID失败,响应状态码:{response}") + raise Exception("获取新建联系人ID失败") + + res = json.loads(response.text) + return res + + +def fetch_all_data(token, pageSize=2000): + """ + 获取所有分页数据 + :param token: 授权token + :param pageSize: 每页大小,默认2000 + :return: 所有数据列表 + """ + all_data = [] + index = 1 + + # 先获取第一页数据以确定总页数 + first_result = fetch_data(token, index, pageSize) + total = first_result.get('total', 0) + total_pages = (total + pageSize - 1) // pageSize # 向上取整计算总页数 + + print(f"总共 {total} 条数据,共 {total_pages} 页") + + while True: + # 如果不是第一页,获取当前页数据 + if index == 1: + result = first_result + else: + result = fetch_data(token, index, pageSize) + + # 提取当前页的数据 + current_data = result.get('data', []) + if not current_data: + print(f"第 {index} 页无数据,结束获取") + break + + all_data.extend(current_data) + print(f"已获取第 {index}/{total_pages} 页,当前页 {len(current_data)} 条数据,累计 {len(all_data)} 条数据") + + # 检查是否还有更多数据 + if len(all_data) >= total: + print("已获取所有数据") + break + + index += 1 + + return all_data + +PG_DSN = dict( + database="dataops_db", + user="dbuser_dba", + password="EmBRxnmmjnE3", + host="124.221.232.219", + port="5432", +) + + +def save_json_to_pg(data: list, api_id: str) -> None: + """把列表落库:先软删历史,再插入新批次""" + print("[save_to_pg] 写入 PG...") + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute( + sql, + ( + api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data), + ), + ) + conn.commit() + cur.close() + print("[save_to_pg] 写入完成") + except psycopg2.Error as e: + print(f"[save_to_pg] 数据库错误: {e}") + raise + except Exception as e: + print(f"[save_to_pg] 未知错误: {e}") + raise + finally: + if "conn" in locals(): + conn.close() + + +def main() -> None: + """主流程""" + # print(get_token()) + print(f"开始请求新建联系人信息:{formatted2_previous_hour(0)}") + token = get_token() + # print(token) + # 获取新建联系人ID + print(f"开始请求新建联系人ID:{formatted2_previous_hour(0)}") + objs = fetch_all_data(token) + # 保存联系人ID + apiId = "a7757b4a-7038-40ef-b11e-81a2c5e0" + save_json_to_pg(objs, apiId) + print(f"结束请求联系人详情:{formatted2_previous_hour(0)}") + + +if __name__ == "__main__": + main()