add workflow 市场易API联系人,dev

This commit is contained in:
root 2025-10-11 15:22:43 +08:00
parent 44683973fc
commit e3863e31ab
3 changed files with 187 additions and 66 deletions

View File

@ -48,7 +48,7 @@ def get_token():
def get_contact_ids(token):
url = "https://open.cloud.custouch.com/platform/cdp/contact/batch"
params = {
'start': formatted2_previous_hour(72),
'start': formatted2_previous_hour(720),
'end': formatted2_previous_hour(0),
'by':'CreatedAt'
}

View File

@ -48,7 +48,7 @@ def get_token():
def get_contact_ids(token):
url = "https://open.cloud.custouch.com/platform/cdp/contact/batch"
params = {
'start': formatted2_previous_hour(72),
'start': formatted2_previous_hour(720),
'end': formatted2_previous_hour(0),
'by':'UpdatedAt',
}

View File

@ -1,74 +1,195 @@
# coding: utf-8
import requests
import json
import datetime as dt
import psycopg2
import uuid
import datetime
import time
import hashlib
import time
#荟聚
"""
获取指定时间段前的时间
:param h: 时间段
:return: 时间
"""
#全局变量,便于参数使用的预设值
current_date = datetime.date.today() # 获取当前日期
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
timestamp = time.time() # 为Unix time即从"1970-01-01 00:00:00"至今的秒数;
sign_version = 'v2' # 签名版本号固定值v2
nonce = str(uuid.uuid4())
#获取签名令牌
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
# 按照指定的格式拼接字符串
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
# 使用SHA256算法计算哈希值
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
return sha256_hash
def formatted2_previous_hour(h, format="%Y-%m-%d %H:%M:%S"):
if h == 0:
return dt.datetime.now().strftime(format)
start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h)
return start_of_previous_hour.strftime(format)
#获取鉴权token
def get_token(url):
#请求鉴权接口
authRequest=requests.get(url)
#解析结果
if not authRequest: #若为空时,返回空
return
auth=json.loads(authRequest.text)
return auth
print('开始加载数据api_contact_events:获取联系人事件')
authUrl='https://open.cloud.custouch.com/platform/cdp/token'
print('开始请求令牌。')
#authRequest=requests.get(authUrl)
#auth=json.loads(authRequest.text)
auth = get_token(authUrl)
#循环判断auth是否为空若为空等待30s后重新请求
i = 0
while 'error' in auth and i < 60:
time.sleep(60)
auth = get_token(authUrl)
i = i + 1
print('开始请求数据总数。')
url='https://open.cloud.custouch.com/platform/cdp/contact/event'
header={}
body={'app_secret':auth['app_secret'],'app_key':auth['app_key'],'contactId':'',}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = 'a7757b4a-7038-40ef-b11e-81a2c5e0';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'a7757b4a-7038-40ef-b11e-81a2c5e0',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束api_contact_events:获取联系人事件')
"""
获取token
:return: token
"""
def get_token():
url = "https://open.cloud.custouch.com/platform/cdp/token"
token_payload = {
"grant_type": "client_credentials",
"app_key": "e9b240eb3a9848e89a96c5e7857794da",
"app_secret": "f8cb7069e7dd468888e360bf8c259fc6",
"scope": "openid",
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.request("POST", url, headers=headers, data=token_payload)
if response.status_code != 200:
raise Exception("获取token失败")
res = json.loads(response.text)
return res["access_token"]
"""
获取API数据
:param token: token
:return: contact_ids
"""
def fetch_data(token, index=1, pageSize=2000):
url = "https://open.cloud.custouch.com/platform/cdp/contact/event"
params = {
"connectIds": [],
"eventMeta": [
{"id": 200001},
{"id": 200003},
{"id": 200004},
{"id": 201003},
{"id": 204001},
{"id": 204003},
{"id": 204004},
{"id": 206001},
{"id": 209002},
{"id": 214001},
],
"desc": True,
"from": formatted2_previous_hour(720, "%Y-%m-%dT%H:%M:%SZ"),
"index": index,
"size": pageSize
}
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
print(params)
response = requests.request("POST", url, headers=headers, data=json.dumps(params))
if response.status_code != 200:
print(f"获取新建联系人ID失败响应状态码{response}")
raise Exception("获取新建联系人ID失败")
res = json.loads(response.text)
return res
def fetch_all_data(token, pageSize=2000):
"""
获取所有分页数据
:param token: 授权token
:param pageSize: 每页大小默认2000
:return: 所有数据列表
"""
all_data = []
index = 1
# 先获取第一页数据以确定总页数
first_result = fetch_data(token, index, pageSize)
total = first_result.get('total', 0)
total_pages = (total + pageSize - 1) // pageSize # 向上取整计算总页数
print(f"总共 {total} 条数据,共 {total_pages}")
while True:
# 如果不是第一页,获取当前页数据
if index == 1:
result = first_result
else:
result = fetch_data(token, index, pageSize)
# 提取当前页的数据
current_data = result.get('data', [])
if not current_data:
print(f"{index} 页无数据,结束获取")
break
all_data.extend(current_data)
print(f"已获取第 {index}/{total_pages} 页,当前页 {len(current_data)} 条数据,累计 {len(all_data)} 条数据")
# 检查是否还有更多数据
if len(all_data) >= total:
print("已获取所有数据")
break
index += 1
return all_data
PG_DSN = dict(
database="dataops_db",
user="dbuser_dba",
password="EmBRxnmmjnE3",
host="124.221.232.219",
port="5432",
)
def save_json_to_pg(data: list, api_id: str) -> None:
"""把列表落库:先软删历史,再插入新批次"""
print("[save_to_pg] 写入 PG...")
sql = """
UPDATE data_api.api_data
SET is_loaded = '1'
WHERE api_id = %s;
INSERT INTO data_api.api_data
(id, api_id, data, total_num, is_loaded, status,
request_tm, execute_tm, remark)
VALUES (%s, %s, %s, %s, '0', '0',
current_timestamp(0), current_timestamp(0), '');
"""
try:
with psycopg2.connect(**PG_DSN) as conn:
with conn.cursor() as cur:
cur.execute(
sql,
(
api_id,
str(uuid.uuid4()),
api_id,
json.dumps(data, ensure_ascii=False),
len(data),
),
)
conn.commit()
cur.close()
print("[save_to_pg] 写入完成")
except psycopg2.Error as e:
print(f"[save_to_pg] 数据库错误: {e}")
raise
except Exception as e:
print(f"[save_to_pg] 未知错误: {e}")
raise
finally:
if "conn" in locals():
conn.close()
def main() -> None:
"""主流程"""
# print(get_token())
print(f"开始请求新建联系人信息:{formatted2_previous_hour(0)}")
token = get_token()
# print(token)
# 获取新建联系人ID
print(f"开始请求新建联系人ID:{formatted2_previous_hour(0)}")
objs = fetch_all_data(token)
# 保存联系人ID
apiId = "a7757b4a-7038-40ef-b11e-81a2c5e0"
save_json_to_pg(objs, apiId)
print(f"结束请求联系人详情:{formatted2_previous_hour(0)}")
if __name__ == "__main__":
main()