add workflow partner1site,dev

This commit is contained in:
root 2025-10-20 14:11:12 +08:00
parent 4aa1470e27
commit da247b855d
1 changed files with 142 additions and 142 deletions

View File

@ -1,142 +1,142 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Partner1site 全接口抓取脚本 Partner1site 全接口抓取脚本
分页结束条件hasNext == False 分页结束条件hasNext == False
""" """
import random import random
import hmac import hmac
import hashlib import hashlib
import base64 import base64
import requests import requests
import json import json
import uuid import uuid
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import Dict, Any from typing import Dict, Any
import psycopg2 import psycopg2
# ======= 配置区 ======= # ======= 配置区 =======
ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc"
SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
PG_DSN = dict( PG_DSN = dict(
database="dataops_db", database="dataops_db",
user="dbuser_dba", user="dbuser_dba",
password="EmBRxnmmjnE3", password="EmBRxnmmjnE3",
host="124.221.232.219", host="124.221.232.219",
port="5432" port="5432"
) )
API_ID = "2460976d-00c1-47d9-84b2-33e66d68" # 外部传入 api_id占位符 API_ID = "2460976d-00c1-47d9-84b2-33e66d68" # 外部传入 api_id占位符
# ====================== # ======================
class Partner1SiteClient: class Partner1SiteClient:
"""Partner1site API 客户端""" """Partner1site API 客户端"""
def __init__(self, access_key: str, secret_key: str): def __init__(self, access_key: str, secret_key: str):
self.ak = access_key self.ak = access_key
self.sk = secret_key self.sk = secret_key
@staticmethod @staticmethod
def urlsafe_b64encode(data: bytes) -> str: def urlsafe_b64encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode() return base64.urlsafe_b64encode(data).decode()
def gen_token(self, expire_sec: int = 600) -> str: def gen_token(self, expire_sec: int = 600) -> str:
"""生成 API Token""" """生成 API Token"""
random_num = str(random.randint(100000, 999999)) random_num = str(random.randint(100000, 999999))
deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec
parm_str = f"{random_num}:{deadline}" parm_str = f"{random_num}:{deadline}"
enc_parm = self.urlsafe_b64encode(parm_str.encode()) enc_parm = self.urlsafe_b64encode(parm_str.encode())
sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest()
enc_sign = self.urlsafe_b64encode(sign.hex().encode()) enc_sign = self.urlsafe_b64encode(sign.hex().encode())
return f"{self.ak}:{enc_sign}:{enc_parm}" return f"{self.ak}:{enc_sign}:{enc_parm}"
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求结束条件hasNext == False""" """通用分页请求结束条件hasNext == False"""
# if api_name not in BASE_URLS: # if api_name not in BASE_URLS:
# raise ValueError(f"未知 API 数据来源: {api_name}") # raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = 'http://onesite.tek.cn/api/summary/visits' base_url = 'http://onesite.tek.cn/api/summary/visits'
all_data = [] all_data = []
page_num = 0 page_num = 0
page_size = 1000 # 固定每页大小 page_size = 1000 # 固定每页大小
while True: while True:
token = self.gen_token() token = self.gen_token()
params_with_paging = dict(params) params_with_paging = dict(params)
params_with_paging.update({ params_with_paging.update({
"token": token, "token": token,
"size": page_size, "size": page_size,
"page": page_num "page": page_num
}) })
resp = requests.get(base_url, params=params_with_paging, timeout=30) resp = requests.get(base_url, params=params_with_paging, timeout=30)
resp.raise_for_status() resp.raise_for_status()
data_json = resp.json() data_json = resp.json()
if data_json.get("code") != 100 or not data_json.get("success", False): if data_json.get("code") != 100 or not data_json.get("success", False):
raise RuntimeError(f"获取客户拜访数据 API 错误: {data_json.get('message')}") raise RuntimeError(f"获取客户拜访数据 API 错误: {data_json.get('message')}")
content = data_json.get("data", {}).get("content", []) content = data_json.get("data", {}).get("content", [])
all_data.extend(content) all_data.extend(content)
total_elements = data_json.get("data", {}).get("totalElements") total_elements = data_json.get("data", {}).get("totalElements")
has_next = data_json.get("data", {}).get("hasNext", False) has_next = data_json.get("data", {}).get("hasNext", False)
print(f"[获取客户拜访数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}") print(f"[获取客户拜访数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
if not has_next: if not has_next:
break break
page_num += 1 page_num += 1
return all_data return all_data
def save_json_to_pg(data: list, api_id: str) -> None: def save_json_to_pg(data: list, api_id: str) -> None:
"""写入 PostgreSQL软删历史 + 插入新数据""" """写入 PostgreSQL软删历史 + 插入新数据"""
print(f"[save_to_pg] API={api_id} 写入 PG记录数={len(data)}") print(f"[save_to_pg] API={api_id} 写入 PG记录数={len(data)}")
sql = """ sql = """
UPDATE data_api.api_data UPDATE data_api.api_data
SET is_loaded = '1' SET is_loaded = '1'
WHERE api_id = %s; WHERE api_id = %s;
INSERT INTO data_api.api_data INSERT INTO data_api.api_data
(id, api_id, data, total_num, is_loaded, status, (id, api_id, data, total_num, is_loaded, status,
request_tm, execute_tm, remark) request_tm, execute_tm, remark)
VALUES (%s, %s, %s, %s, '0', '0', VALUES (%s, %s, %s, %s, '0', '0',
current_timestamp(0), current_timestamp(0), ''); current_timestamp(0), current_timestamp(0), '');
""" """
try: try:
with psycopg2.connect(**PG_DSN) as conn: with psycopg2.connect(**PG_DSN) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(sql, cur.execute(sql,
(api_id, (api_id,
str(uuid.uuid4()), str(uuid.uuid4()),
api_id, api_id,
json.dumps(data, ensure_ascii=False), json.dumps(data, ensure_ascii=False),
len(data))) len(data)))
conn.commit() conn.commit()
print(f"[save_to_pg] API={api_id} 写入完成") print(f"[save_to_pg] API={api_id} 写入完成")
except Exception as e: except Exception as e:
raise RuntimeError(f"PG写入错误: {e}") raise RuntimeError(f"PG写入错误: {e}")
def get_previous_date(days: int = 0) -> str: def get_previous_date(days: int = 0) -> str:
return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
def main(): def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
data = client.fetch_all_pages( data = client.fetch_all_pages(
api_name="visits", api_name="visits",
params={'startInsertDate':'2025-8-20',} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)} params={'startInsertDate':get_previous_date(7),} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
) )
save_json_to_pg(data, API_ID) save_json_to_pg(data, API_ID)
if __name__ == "__main__": if __name__ == "__main__":
main() main()