add workflow partner1site,dev

This commit is contained in:
root 2025-09-28 15:33:15 +08:00
parent 19f9a062cc
commit ee35021969
1 changed files with 23 additions and 23 deletions

View File

@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
partner1site 多数据来源 API 请求模块 Partner1site 全接口抓取脚本
支持自动分页并存储到 PostgreSQL 分页结束条件hasNext == False
""" """
import random import random
@ -13,14 +13,14 @@ import requests
import json import json
import uuid import uuid
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any from typing import Dict, Any
import psycopg2 import psycopg2
# ======= 配置区 ======= # ======= 配置区 =======
ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc"
SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
# URL 用占位符,外部替换 # URL 用占位符(外部替换)
BASE_URLS = { BASE_URLS = {
"visits": "http://onesite.tek.cn/api/summary/visits", # 客户拜访数据 "visits": "http://onesite.tek.cn/api/summary/visits", # 客户拜访数据
"reports": "http://onesite.tek.cn/api/summary/visits", # 报备数据 "reports": "http://onesite.tek.cn/api/summary/visits", # 报备数据
@ -41,9 +41,7 @@ API_ID = "2460976d-00c1-47d9-84b2-33e66d68" # 外部传入 api_id占位符
class Partner1SiteClient: class Partner1SiteClient:
""" """Partner1site API 客户端"""
partner1site API 客户端
"""
def __init__(self, access_key: str, secret_key: str): def __init__(self, access_key: str, secret_key: str):
self.ak = access_key self.ak = access_key
@ -54,7 +52,7 @@ class Partner1SiteClient:
return base64.urlsafe_b64encode(data).decode() return base64.urlsafe_b64encode(data).decode()
def gen_token(self, expire_sec: int = 600) -> str: def gen_token(self, expire_sec: int = 600) -> str:
"""生成 token —— 按调通算法""" """生成 API Token"""
random_num = str(random.randint(100000, 999999)) random_num = str(random.randint(100000, 999999))
deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec
parm_str = f"{random_num}:{deadline}" parm_str = f"{random_num}:{deadline}"
@ -64,14 +62,14 @@ class Partner1SiteClient:
return f"{self.ak}:{enc_sign}:{enc_parm}" return f"{self.ak}:{enc_sign}:{enc_parm}"
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求""" """通用分页请求结束条件hasNext == False"""
if api_name not in BASE_URLS: if api_name not in BASE_URLS:
raise ValueError(f"未知 API 数据来源: {api_name}") raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = BASE_URLS[api_name] base_url = BASE_URLS[api_name]
all_data = [] all_data = []
page_num = 0 page_num = 0
page_size = 50 # 固定页容量 page_size = 50 # 固定每页大小
while True: while True:
token = self.gen_token() token = self.gen_token()
@ -82,7 +80,7 @@ class Partner1SiteClient:
"page": page_num "page": page_num
}) })
resp = requests.get(base_url, params=params_with_paging, timeout=15) resp = requests.get(base_url, params=params_with_paging, timeout=30)
resp.raise_for_status() resp.raise_for_status()
data_json = resp.json() data_json = resp.json()
@ -91,9 +89,12 @@ class Partner1SiteClient:
content = data_json.get("data", {}).get("content", []) content = data_json.get("data", {}).get("content", [])
all_data.extend(content) all_data.extend(content)
print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)}") total_elements = data_json.get("data", {}).get("totalElements")
has_next = data_json.get("data", {}).get("hasNext", False)
if data_json.get("data", {}).get("last", True): print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
if not has_next:
break break
page_num += 1 page_num += 1
@ -131,44 +132,43 @@ def save_json_to_pg(data: list, api_id: str) -> None:
def get_previous_date(days: int = 0) -> str: def get_previous_date(days: int = 0) -> str:
"""获取 N 天前的日期字符串"""
return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
def main(): def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
# ✅ 1客户拜访数据 # ✅ 客户拜访数据(执行)
visits_data = client.fetch_all_pages( visits_data = client.fetch_all_pages(
api_name="visits", api_name="visits",
# params={"startInsertDate": get_previous_date(5), "endInsertDate": get_previous_date(0)} params={} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
params={} params={"startInsertDate":"2025-08-20","endInsertDate":get_previous_date(0)}
) )
save_json_to_pg(visits_data, API_ID) save_json_to_pg(visits_data, API_ID)
# ❌ 2报备数据(注释掉,后续可启用 # ❌ 报备数据(暂时注释)
""" """
reports_data = client.fetch_all_pages( reports_data = client.fetch_all_pages(
api_name="reports", api_name="reports",
params={"startApplyDate": get_previous_date(5), "endApplyDate": get_previous_date(0)} params={}
) )
save_json_to_pg(reports_data, API_ID) save_json_to_pg(reports_data, API_ID)
""" """
# ❌ 3POS 数据(注释掉,后续可启用 # ❌ POS 数据(暂时注释)
""" """
pos_data = client.fetch_all_pages( pos_data = client.fetch_all_pages(
api_name="pos_datas", api_name="pos_datas",
params={"startPosInsertDate": get_previous_date(10), "endPosInsertDate": get_previous_date(0)} params={}
) )
save_json_to_pg(pos_data, API_ID) save_json_to_pg(pos_data, API_ID)
""" """
# ❌ 4客户及联系人数据(注释掉,后续可启用 # ❌ 客户及联系人数据(暂时注释)
""" """
cust_contact_data = client.fetch_all_pages( cust_contact_data = client.fetch_all_pages(
api_name="customer_and_contact_datas", api_name="customer_and_contact_datas",
params={"customerId": 0, "customerContactId": 0} params={}
) )
save_json_to_pg(cust_contact_data, API_ID) save_json_to_pg(cust_contact_data, API_ID)
""" """