From ee4eae17c381035019b2483cd5c9bc12369df784 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 29 Oct 2025 18:26:38 +0800 Subject: [PATCH] =?UTF-8?q?add=20workflow=20=E5=B8=82=E5=9C=BA=E6=98=93API?= =?UTF-8?q?=E8=81=94=E7=B3=BB=E4=BA=BA,dev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api_contact_events_feign.py | 390 +++++++++--------- .../api_contact_events_load.sql | 9 +- 2 files changed, 201 insertions(+), 198 deletions(-) diff --git a/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_feign.py b/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_feign.py index 7c0e98f..7b417c2 100644 --- a/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_feign.py +++ b/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_feign.py @@ -1,195 +1,195 @@ -# coding: utf-8 -import requests -import json -import datetime as dt -import psycopg2 -import uuid - -""" -获取指定时间段前的时间 -:param h: 时间段 -:return: 时间 -""" - - -def formatted2_previous_hour(h, format="%Y-%m-%d %H:%M:%S"): - if h == 0: - return dt.datetime.now().strftime(format) - start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0) - # 减去一个小时,得到前一个小时的开始时间 - start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h) - return start_of_previous_hour.strftime(format) - - -""" -获取token -:return: token -""" - - -def get_token(): - url = "https://open.cloud.custouch.com/platform/cdp/token" - token_payload = { - "grant_type": "client_credentials", - "app_key": "e9b240eb3a9848e89a96c5e7857794da", - "app_secret": "f8cb7069e7dd468888e360bf8c259fc6", - "scope": "openid", - } - headers = {"Content-Type": "application/x-www-form-urlencoded"} - response = requests.request("POST", url, headers=headers, data=token_payload) - if response.status_code != 200: - raise Exception("获取token失败") - - res = json.loads(response.text) - return res["access_token"] - - -""" -获取API数据 -:param token: token -:return: contact_ids -""" - - -def fetch_data(token, index=1, pageSize=2000): - url = "https://open.cloud.custouch.com/platform/cdp/contact/event" - params = { - "connectIds": [], - "eventMeta": [ - {"id": 200001}, - {"id": 200003}, - {"id": 200004}, - {"id": 201003}, - {"id": 204001}, - {"id": 204003}, - {"id": 204004}, - {"id": 206001}, - {"id": 209002}, - {"id": 214001}, - ], - "desc": True, - "from": formatted2_previous_hour(1440, "%Y-%m-%dT%H:%M:%SZ"), - "index": index, - "size": pageSize - } - headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} - print(params) - response = requests.request("POST", url, headers=headers, data=json.dumps(params)) - if response.status_code != 200: - print(f"获取新建联系人ID失败,响应状态码:{response}") - raise Exception("获取新建联系人ID失败") - - res = json.loads(response.text) - return res - - -def fetch_all_data(token, pageSize=2000): - """ - 获取所有分页数据 - :param token: 授权token - :param pageSize: 每页大小,默认2000 - :return: 所有数据列表 - """ - all_data = [] - index = 1 - - # 先获取第一页数据以确定总页数 - first_result = fetch_data(token, index, pageSize) - total = first_result.get('total', 0) - total_pages = (total + pageSize - 1) // pageSize # 向上取整计算总页数 - - print(f"总共 {total} 条数据,共 {total_pages} 页") - - while True: - # 如果不是第一页,获取当前页数据 - if index == 1: - result = first_result - else: - result = fetch_data(token, index, pageSize) - - # 提取当前页的数据 - current_data = result.get('data', []) - if not current_data: - print(f"第 {index} 页无数据,结束获取") - break - - all_data.extend(current_data) - print(f"已获取第 {index}/{total_pages} 页,当前页 {len(current_data)} 条数据,累计 {len(all_data)} 条数据") - - # 检查是否还有更多数据 - if len(all_data) >= total: - print("已获取所有数据") - break - - index += 1 - - return all_data - -PG_DSN = dict( - database="dataops_db", - user="dbuser_dba", - password="EmBRxnmmjnE3", - host="124.221.232.219", - port="5432", -) - - -def save_json_to_pg(data: list, api_id: str) -> None: - """把列表落库:先软删历史,再插入新批次""" - print("[save_to_pg] 写入 PG...") - sql = """ - UPDATE data_api.api_data - SET is_loaded = '1' - WHERE api_id = %s; - - INSERT INTO data_api.api_data - (id, api_id, data, total_num, is_loaded, status, - request_tm, execute_tm, remark) - VALUES (%s, %s, %s, %s, '0', '0', - current_timestamp(0), current_timestamp(0), ''); - """ - - try: - with psycopg2.connect(**PG_DSN) as conn: - with conn.cursor() as cur: - cur.execute( - sql, - ( - api_id, - str(uuid.uuid4()), - api_id, - json.dumps(data, ensure_ascii=False), - len(data), - ), - ) - conn.commit() - cur.close() - print("[save_to_pg] 写入完成") - except psycopg2.Error as e: - print(f"[save_to_pg] 数据库错误: {e}") - raise - except Exception as e: - print(f"[save_to_pg] 未知错误: {e}") - raise - finally: - if "conn" in locals(): - conn.close() - - -def main() -> None: - """主流程""" - # print(get_token()) - print(f"开始请求新建联系人信息:{formatted2_previous_hour(0)}") - token = get_token() - # print(token) - # 获取新建联系人ID - print(f"开始请求新建联系人ID:{formatted2_previous_hour(0)}") - objs = fetch_all_data(token) - # 保存联系人ID - apiId = "a7757b4a-7038-40ef-b11e-81a2c5e0" - save_json_to_pg(objs, apiId) - print(f"结束请求联系人详情:{formatted2_previous_hour(0)}") - - -if __name__ == "__main__": - main() +# coding: utf-8 +import requests +import json +import datetime as dt +import psycopg2 +import uuid + +""" +获取指定时间段前的时间 +:param h: 时间段 +:return: 时间 +""" + + +def formatted2_previous_hour(h, format="%Y-%m-%d %H:%M:%S"): + if h == 0: + return dt.datetime.now().strftime(format) + start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h) + return start_of_previous_hour.strftime(format) + + +""" +获取token +:return: token +""" + + +def get_token(): + url = "https://open.cloud.custouch.com/platform/cdp/token" + token_payload = { + "grant_type": "client_credentials", + "app_key": "e9b240eb3a9848e89a96c5e7857794da", + "app_secret": "f8cb7069e7dd468888e360bf8c259fc6", + "scope": "openid", + } + headers = {"Content-Type": "application/x-www-form-urlencoded"} + response = requests.request("POST", url, headers=headers, data=token_payload) + if response.status_code != 200: + raise Exception("获取token失败") + + res = json.loads(response.text) + return res["access_token"] + + +""" +获取API数据 +:param token: token +:return: contact_ids +""" + + +def fetch_data(token, index=1, pageSize=2000): + url = "https://open.cloud.custouch.com/platform/cdp/contact/event" + params = { + "connectIds": [], + "eventMeta": [ + {"id": 200001}, + {"id": 200003}, + {"id": 200004}, + {"id": 201003}, + {"id": 204001}, + {"id": 204003}, + {"id": 204004}, + {"id": 206001}, + {"id": 209002}, + {"id": 214001}, + ], + "desc": True, + "from": formatted2_previous_hour(1440, "%Y-%m-%dT%H:%M:%SZ"), + "index": index, + "size": pageSize + } + headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} + print(params) + response = requests.request("POST", url, headers=headers, data=json.dumps(params)) + if response.status_code != 200: + print(f"获取新建联系人ID失败,响应状态码:{response}") + raise Exception("获取新建联系人ID失败") + + res = json.loads(response.text) + return res + + +def fetch_all_data(token, pageSize=2000): + """ + 获取所有分页数据 + :param token: 授权token + :param pageSize: 每页大小,默认2000 + :return: 所有数据列表 + """ + all_data = [] + index = 1 + + # 先获取第一页数据以确定总页数 + first_result = fetch_data(token, index, pageSize) + total = first_result.get('total', 0) + total_pages = (total + pageSize - 1) // pageSize # 向上取整计算总页数 + + print(f"总共 {total} 条数据,共 {total_pages} 页") + + while True: + # 如果不是第一页,获取当前页数据 + if index == 1: + result = first_result + else: + result = fetch_data(token, index, pageSize) + + # 提取当前页的数据 + current_data = result.get('data', []) + if not current_data: + print(f"第 {index} 页无数据,结束获取") + break + + all_data.extend(current_data) + print(f"已获取第 {index}/{total_pages} 页,当前页 {len(current_data)} 条数据,累计 {len(all_data)} 条数据") + + # 检查是否还有更多数据 + if len(all_data) >= total: + print("已获取所有数据") + break + + index += 1 + + return all_data + +PG_DSN = dict( + database="dataops_db", + user="dbuser_dba", + password="EmBRxnmmjnE3", + host="124.221.232.219", + port="5432", +) + + +def save_json_to_pg(data: list, api_id: str) -> None: + """把列表落库:先软删历史,再插入新批次""" + print("[save_to_pg] 写入 PG...") + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute( + sql, + ( + api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data), + ), + ) + conn.commit() + cur.close() + print("[save_to_pg] 写入完成") + except psycopg2.Error as e: + print(f"[save_to_pg] 数据库错误: {e}") + raise + except Exception as e: + print(f"[save_to_pg] 未知错误: {e}") + raise + finally: + if "conn" in locals(): + conn.close() + + +def main() -> None: + """主流程""" + # print(get_token()) + print(f"开始请求新建联系人信息:{formatted2_previous_hour(0)}") + token = get_token() + # print(token) + # 获取新建联系人ID + print(f"开始请求新建联系人ID:{formatted2_previous_hour(0)}") + objs = fetch_all_data(token) + # 保存联系人ID + apiId = "a7757b4a-7038-40ef-b11e-81a2c5e0" + save_json_to_pg(objs, apiId) + print(f"结束请求联系人详情:{formatted2_previous_hour(0)}") + + +if __name__ == "__main__": + main() diff --git a/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_load.sql b/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_load.sql index 462e8bc..a3c5193 100644 --- a/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_load.sql +++ b/dev/workflow/TK_Cust/yi_api_contact/获取联系人事件/api_contact_events_load.sql @@ -16,7 +16,8 @@ insert into data_api.api_contact_events ( , meta_id , meta_name , meta_remark - , properties + , properties + , contact_id ,etl_tx_dt ) select @@ -30,7 +31,8 @@ select , case when trim(both from meta_id)='' then null else meta_id::text end meta_id , case when trim(both from meta_name)='' then null else meta_name::text end meta_name , case when trim(both from meta_remark)='' then null else meta_remark::text end meta_remark - , case when trim(both from properties)='' then null else properties::text end properties + , case when trim(both from properties)='' then null else properties::text end properties + , case when trim(both from contact_id)='' then null else contact_id::text end contact_id ,etl_tx_dt from ( select @@ -44,7 +46,8 @@ select , (json_array_elements(data::json)::json->>'metaId') meta_id , (json_array_elements(data::json)::json->>'metaName') meta_name , (json_array_elements(data::json)::json->>'metaRemark') meta_remark - , (json_array_elements(data::json)::json->>'properties') properties + , (json_array_elements(data::json)::json->>'properties') properties + , (json_array_elements(data::json)::json->>'contactId') contact_id ,CURRENT_TIMESTAMP(0) etl_tx_dt from (select * from data_api.api_data WHERE api_id='a7757b4a-7038-40ef-b11e-81a2c5e0' and is_loaded = '0' order by request_tm desc limit 1) p )p;