add workflow 市场易API联系人,dev

This commit is contained in:
root 2025-10-29 18:26:38 +08:00
parent 82a38c022f
commit ee4eae17c3
2 changed files with 201 additions and 198 deletions

View File

@ -1,195 +1,195 @@
# coding: utf-8 # coding: utf-8
import requests import requests
import json import json
import datetime as dt import datetime as dt
import psycopg2 import psycopg2
import uuid import uuid
""" """
获取指定时间段前的时间 获取指定时间段前的时间
:param h: 时间段 :param h: 时间段
:return: 时间 :return: 时间
""" """
def formatted2_previous_hour(h, format="%Y-%m-%d %H:%M:%S"): def formatted2_previous_hour(h, format="%Y-%m-%d %H:%M:%S"):
if h == 0: if h == 0:
return dt.datetime.now().strftime(format) return dt.datetime.now().strftime(format)
start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0) start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间 # 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h) start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h)
return start_of_previous_hour.strftime(format) return start_of_previous_hour.strftime(format)
""" """
获取token 获取token
:return: token :return: token
""" """
def get_token(): def get_token():
url = "https://open.cloud.custouch.com/platform/cdp/token" url = "https://open.cloud.custouch.com/platform/cdp/token"
token_payload = { token_payload = {
"grant_type": "client_credentials", "grant_type": "client_credentials",
"app_key": "e9b240eb3a9848e89a96c5e7857794da", "app_key": "e9b240eb3a9848e89a96c5e7857794da",
"app_secret": "f8cb7069e7dd468888e360bf8c259fc6", "app_secret": "f8cb7069e7dd468888e360bf8c259fc6",
"scope": "openid", "scope": "openid",
} }
headers = {"Content-Type": "application/x-www-form-urlencoded"} headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.request("POST", url, headers=headers, data=token_payload) response = requests.request("POST", url, headers=headers, data=token_payload)
if response.status_code != 200: if response.status_code != 200:
raise Exception("获取token失败") raise Exception("获取token失败")
res = json.loads(response.text) res = json.loads(response.text)
return res["access_token"] return res["access_token"]
""" """
获取API数据 获取API数据
:param token: token :param token: token
:return: contact_ids :return: contact_ids
""" """
def fetch_data(token, index=1, pageSize=2000): def fetch_data(token, index=1, pageSize=2000):
url = "https://open.cloud.custouch.com/platform/cdp/contact/event" url = "https://open.cloud.custouch.com/platform/cdp/contact/event"
params = { params = {
"connectIds": [], "connectIds": [],
"eventMeta": [ "eventMeta": [
{"id": 200001}, {"id": 200001},
{"id": 200003}, {"id": 200003},
{"id": 200004}, {"id": 200004},
{"id": 201003}, {"id": 201003},
{"id": 204001}, {"id": 204001},
{"id": 204003}, {"id": 204003},
{"id": 204004}, {"id": 204004},
{"id": 206001}, {"id": 206001},
{"id": 209002}, {"id": 209002},
{"id": 214001}, {"id": 214001},
], ],
"desc": True, "desc": True,
"from": formatted2_previous_hour(1440, "%Y-%m-%dT%H:%M:%SZ"), "from": formatted2_previous_hour(1440, "%Y-%m-%dT%H:%M:%SZ"),
"index": index, "index": index,
"size": pageSize "size": pageSize
} }
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
print(params) print(params)
response = requests.request("POST", url, headers=headers, data=json.dumps(params)) response = requests.request("POST", url, headers=headers, data=json.dumps(params))
if response.status_code != 200: if response.status_code != 200:
print(f"获取新建联系人ID失败响应状态码{response}") print(f"获取新建联系人ID失败响应状态码{response}")
raise Exception("获取新建联系人ID失败") raise Exception("获取新建联系人ID失败")
res = json.loads(response.text) res = json.loads(response.text)
return res return res
def fetch_all_data(token, pageSize=2000): def fetch_all_data(token, pageSize=2000):
""" """
获取所有分页数据 获取所有分页数据
:param token: 授权token :param token: 授权token
:param pageSize: 每页大小默认2000 :param pageSize: 每页大小默认2000
:return: 所有数据列表 :return: 所有数据列表
""" """
all_data = [] all_data = []
index = 1 index = 1
# 先获取第一页数据以确定总页数 # 先获取第一页数据以确定总页数
first_result = fetch_data(token, index, pageSize) first_result = fetch_data(token, index, pageSize)
total = first_result.get('total', 0) total = first_result.get('total', 0)
total_pages = (total + pageSize - 1) // pageSize # 向上取整计算总页数 total_pages = (total + pageSize - 1) // pageSize # 向上取整计算总页数
print(f"总共 {total} 条数据,共 {total_pages}") print(f"总共 {total} 条数据,共 {total_pages}")
while True: while True:
# 如果不是第一页,获取当前页数据 # 如果不是第一页,获取当前页数据
if index == 1: if index == 1:
result = first_result result = first_result
else: else:
result = fetch_data(token, index, pageSize) result = fetch_data(token, index, pageSize)
# 提取当前页的数据 # 提取当前页的数据
current_data = result.get('data', []) current_data = result.get('data', [])
if not current_data: if not current_data:
print(f"{index} 页无数据,结束获取") print(f"{index} 页无数据,结束获取")
break break
all_data.extend(current_data) all_data.extend(current_data)
print(f"已获取第 {index}/{total_pages} 页,当前页 {len(current_data)} 条数据,累计 {len(all_data)} 条数据") print(f"已获取第 {index}/{total_pages} 页,当前页 {len(current_data)} 条数据,累计 {len(all_data)} 条数据")
# 检查是否还有更多数据 # 检查是否还有更多数据
if len(all_data) >= total: if len(all_data) >= total:
print("已获取所有数据") print("已获取所有数据")
break break
index += 1 index += 1
return all_data return all_data
PG_DSN = dict( PG_DSN = dict(
database="dataops_db", database="dataops_db",
user="dbuser_dba", user="dbuser_dba",
password="EmBRxnmmjnE3", password="EmBRxnmmjnE3",
host="124.221.232.219", host="124.221.232.219",
port="5432", port="5432",
) )
def save_json_to_pg(data: list, api_id: str) -> None: def save_json_to_pg(data: list, api_id: str) -> None:
"""把列表落库:先软删历史,再插入新批次""" """把列表落库:先软删历史,再插入新批次"""
print("[save_to_pg] 写入 PG...") print("[save_to_pg] 写入 PG...")
sql = """ sql = """
UPDATE data_api.api_data UPDATE data_api.api_data
SET is_loaded = '1' SET is_loaded = '1'
WHERE api_id = %s; WHERE api_id = %s;
INSERT INTO data_api.api_data INSERT INTO data_api.api_data
(id, api_id, data, total_num, is_loaded, status, (id, api_id, data, total_num, is_loaded, status,
request_tm, execute_tm, remark) request_tm, execute_tm, remark)
VALUES (%s, %s, %s, %s, '0', '0', VALUES (%s, %s, %s, %s, '0', '0',
current_timestamp(0), current_timestamp(0), ''); current_timestamp(0), current_timestamp(0), '');
""" """
try: try:
with psycopg2.connect(**PG_DSN) as conn: with psycopg2.connect(**PG_DSN) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute( cur.execute(
sql, sql,
( (
api_id, api_id,
str(uuid.uuid4()), str(uuid.uuid4()),
api_id, api_id,
json.dumps(data, ensure_ascii=False), json.dumps(data, ensure_ascii=False),
len(data), len(data),
), ),
) )
conn.commit() conn.commit()
cur.close() cur.close()
print("[save_to_pg] 写入完成") print("[save_to_pg] 写入完成")
except psycopg2.Error as e: except psycopg2.Error as e:
print(f"[save_to_pg] 数据库错误: {e}") print(f"[save_to_pg] 数据库错误: {e}")
raise raise
except Exception as e: except Exception as e:
print(f"[save_to_pg] 未知错误: {e}") print(f"[save_to_pg] 未知错误: {e}")
raise raise
finally: finally:
if "conn" in locals(): if "conn" in locals():
conn.close() conn.close()
def main() -> None: def main() -> None:
"""主流程""" """主流程"""
# print(get_token()) # print(get_token())
print(f"开始请求新建联系人信息:{formatted2_previous_hour(0)}") print(f"开始请求新建联系人信息:{formatted2_previous_hour(0)}")
token = get_token() token = get_token()
# print(token) # print(token)
# 获取新建联系人ID # 获取新建联系人ID
print(f"开始请求新建联系人ID:{formatted2_previous_hour(0)}") print(f"开始请求新建联系人ID:{formatted2_previous_hour(0)}")
objs = fetch_all_data(token) objs = fetch_all_data(token)
# 保存联系人ID # 保存联系人ID
apiId = "a7757b4a-7038-40ef-b11e-81a2c5e0" apiId = "a7757b4a-7038-40ef-b11e-81a2c5e0"
save_json_to_pg(objs, apiId) save_json_to_pg(objs, apiId)
print(f"结束请求联系人详情:{formatted2_previous_hour(0)}") print(f"结束请求联系人详情:{formatted2_previous_hour(0)}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -16,7 +16,8 @@ insert into data_api.api_contact_events (
, meta_id , meta_id
, meta_name , meta_name
, meta_remark , meta_remark
, properties , properties
, contact_id
,etl_tx_dt ,etl_tx_dt
) )
select select
@ -30,7 +31,8 @@ select
, case when trim(both from meta_id)='' then null else meta_id::text end meta_id , case when trim(both from meta_id)='' then null else meta_id::text end meta_id
, case when trim(both from meta_name)='' then null else meta_name::text end meta_name , case when trim(both from meta_name)='' then null else meta_name::text end meta_name
, case when trim(both from meta_remark)='' then null else meta_remark::text end meta_remark , case when trim(both from meta_remark)='' then null else meta_remark::text end meta_remark
, case when trim(both from properties)='' then null else properties::text end properties , case when trim(both from properties)='' then null else properties::text end properties
, case when trim(both from contact_id)='' then null else contact_id::text end contact_id
,etl_tx_dt ,etl_tx_dt
from ( from (
select select
@ -44,7 +46,8 @@ select
, (json_array_elements(data::json)::json->>'metaId') meta_id , (json_array_elements(data::json)::json->>'metaId') meta_id
, (json_array_elements(data::json)::json->>'metaName') meta_name , (json_array_elements(data::json)::json->>'metaName') meta_name
, (json_array_elements(data::json)::json->>'metaRemark') meta_remark , (json_array_elements(data::json)::json->>'metaRemark') meta_remark
, (json_array_elements(data::json)::json->>'properties') properties , (json_array_elements(data::json)::json->>'properties') properties
, (json_array_elements(data::json)::json->>'contactId') contact_id
,CURRENT_TIMESTAMP(0) etl_tx_dt ,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data from (select * from data_api.api_data
WHERE api_id='a7757b4a-7038-40ef-b11e-81a2c5e0' and is_loaded = '0' order by request_tm desc limit 1) p )p; WHERE api_id='a7757b4a-7038-40ef-b11e-81a2c5e0' and is_loaded = '0' order by request_tm desc limit 1) p )p;