add workflow partner1site,dev

This commit is contained in:
root 2025-09-28 15:18:36 +08:00
parent 684465c71f
commit 19f9a062cc
1 changed files with 178 additions and 177 deletions

View File

@ -1,177 +1,178 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
partner1site 多数据来源 API 请求模块
支持自动分页并存储到 PostgreSQL
"""
import random
import hmac
import hashlib
import base64
import requests
import json
import uuid
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any
import psycopg2
# ======= 配置区 =======
ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc"
SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
# URL 用占位符,外部替换
BASE_URLS = {
"visits": "http://onesite.tek.cn/api/summary/visits", # 客户拜访数据
"reports": "http://onesite.tek.cn/api/summary/visits", # 报备数据
"pos_datas": "http://onesite.tek.cn/api/summary/visits", # POS数据
"customer_and_contact_datas": "http://onesite.tek.cn/api/summary/visits" # 客户及联系人数据
}
PG_DSN = dict(
database="dataops_db",
user="dbuser_dba",
password="EmBRxnmmjnE3",
host="124.221.232.219",
port="5432"
)
API_ID = "2460976d-00c1-47d9-84b2-33e66d68" # 外部传入 api_id占位符
# ======================
class Partner1SiteClient:
"""
partner1site API 客户端
"""
def __init__(self, access_key: str, secret_key: str):
self.ak = access_key
self.sk = secret_key
@staticmethod
def urlsafe_b64encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode()
def gen_token(self, expire_sec: int = 600) -> str:
"""生成 token —— 按调通算法"""
random_num = str(random.randint(100000, 999999))
deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec
parm_str = f"{random_num}:{deadline}"
enc_parm = self.urlsafe_b64encode(parm_str.encode())
sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest()
enc_sign = self.urlsafe_b64encode(sign.hex().encode())
return f"{self.ak}:{enc_sign}:{enc_parm}"
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求"""
if api_name not in BASE_URLS:
raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = BASE_URLS[api_name]
all_data = []
page_num = 0
page_size = 50 # 固定页容量
while True:
token = self.gen_token()
params_with_paging = dict(params)
params_with_paging.update({
"token": token,
"size": page_size,
"page": page_num
})
resp = requests.get(base_url, params=params_with_paging, timeout=15)
resp.raise_for_status()
data_json = resp.json()
if data_json.get("code") != 100 or not data_json.get("success", False):
raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}")
content = data_json.get("data", {}).get("content", [])
all_data.extend(content)
print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)}")
if data_json.get("data", {}).get("last", True):
break
page_num += 1
return all_data
def save_json_to_pg(data: list, api_id: str) -> None:
"""写入 PostgreSQL软删历史 + 插入新数据"""
print(f"[save_to_pg] API={api_id} 写入 PG记录数={len(data)}")
sql = """
UPDATE data_api.api_data
SET is_loaded = '1'
WHERE api_id = %s;
INSERT INTO data_api.api_data
(id, api_id, data, total_num, is_loaded, status,
request_tm, execute_tm, remark)
VALUES (%s, %s, %s, %s, '0', '0',
current_timestamp(0), current_timestamp(0), '');
"""
try:
with psycopg2.connect(**PG_DSN) as conn:
with conn.cursor() as cur:
cur.execute(sql,
(api_id,
str(uuid.uuid4()),
api_id,
json.dumps(data, ensure_ascii=False),
len(data)))
conn.commit()
print(f"[save_to_pg] API={api_id} 写入完成")
except Exception as e:
raise RuntimeError(f"PG写入错误: {e}")
def get_previous_date(days: int = 0) -> str:
"""获取 N 天前的日期字符串"""
return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
# ✅ 1⃣ 客户拜访数据
visits_data = client.fetch_all_pages(
api_name="visits",
params={"startInsertDate": get_previous_date(5), "endInsertDate": get_previous_date(0)}
)
save_json_to_pg(visits_data, API_ID)
# ❌ 2⃣ 报备数据(注释掉,后续可启用)
"""
reports_data = client.fetch_all_pages(
api_name="reports",
params={"startApplyDate": get_previous_date(5), "endApplyDate": get_previous_date(0)}
)
save_json_to_pg(reports_data, API_ID)
"""
# ❌ 3⃣ POS 数据(注释掉,后续可启用)
"""
pos_data = client.fetch_all_pages(
api_name="pos_datas",
params={"startPosInsertDate": get_previous_date(10), "endPosInsertDate": get_previous_date(0)}
)
save_json_to_pg(pos_data, API_ID)
"""
# ❌ 4⃣ 客户及联系人数据(注释掉,后续可启用)
"""
cust_contact_data = client.fetch_all_pages(
api_name="customer_and_contact_datas",
params={"customerId": 0, "customerContactId": 0}
)
save_json_to_pg(cust_contact_data, API_ID)
"""
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
partner1site 多数据来源 API 请求模块
支持自动分页并存储到 PostgreSQL
"""
import random
import hmac
import hashlib
import base64
import requests
import json
import uuid
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any
import psycopg2
# ======= 配置区 =======
ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc"
SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
# URL 用占位符,外部替换
BASE_URLS = {
"visits": "http://onesite.tek.cn/api/summary/visits", # 客户拜访数据
"reports": "http://onesite.tek.cn/api/summary/visits", # 报备数据
"pos_datas": "http://onesite.tek.cn/api/summary/visits", # POS数据
"customer_and_contact_datas": "http://onesite.tek.cn/api/summary/visits" # 客户及联系人数据
}
PG_DSN = dict(
database="dataops_db",
user="dbuser_dba",
password="EmBRxnmmjnE3",
host="124.221.232.219",
port="5432"
)
API_ID = "2460976d-00c1-47d9-84b2-33e66d68" # 外部传入 api_id占位符
# ======================
class Partner1SiteClient:
"""
partner1site API 客户端
"""
def __init__(self, access_key: str, secret_key: str):
self.ak = access_key
self.sk = secret_key
@staticmethod
def urlsafe_b64encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode()
def gen_token(self, expire_sec: int = 600) -> str:
"""生成 token —— 按调通算法"""
random_num = str(random.randint(100000, 999999))
deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec
parm_str = f"{random_num}:{deadline}"
enc_parm = self.urlsafe_b64encode(parm_str.encode())
sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest()
enc_sign = self.urlsafe_b64encode(sign.hex().encode())
return f"{self.ak}:{enc_sign}:{enc_parm}"
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求"""
if api_name not in BASE_URLS:
raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = BASE_URLS[api_name]
all_data = []
page_num = 0
page_size = 50 # 固定页容量
while True:
token = self.gen_token()
params_with_paging = dict(params)
params_with_paging.update({
"token": token,
"size": page_size,
"page": page_num
})
resp = requests.get(base_url, params=params_with_paging, timeout=15)
resp.raise_for_status()
data_json = resp.json()
if data_json.get("code") != 100 or not data_json.get("success", False):
raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}")
content = data_json.get("data", {}).get("content", [])
all_data.extend(content)
print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)}")
if data_json.get("data", {}).get("last", True):
break
page_num += 1
return all_data
def save_json_to_pg(data: list, api_id: str) -> None:
"""写入 PostgreSQL软删历史 + 插入新数据"""
print(f"[save_to_pg] API={api_id} 写入 PG记录数={len(data)}")
sql = """
UPDATE data_api.api_data
SET is_loaded = '1'
WHERE api_id = %s;
INSERT INTO data_api.api_data
(id, api_id, data, total_num, is_loaded, status,
request_tm, execute_tm, remark)
VALUES (%s, %s, %s, %s, '0', '0',
current_timestamp(0), current_timestamp(0), '');
"""
try:
with psycopg2.connect(**PG_DSN) as conn:
with conn.cursor() as cur:
cur.execute(sql,
(api_id,
str(uuid.uuid4()),
api_id,
json.dumps(data, ensure_ascii=False),
len(data)))
conn.commit()
print(f"[save_to_pg] API={api_id} 写入完成")
except Exception as e:
raise RuntimeError(f"PG写入错误: {e}")
def get_previous_date(days: int = 0) -> str:
"""获取 N 天前的日期字符串"""
return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
# ✅ 1⃣ 客户拜访数据
visits_data = client.fetch_all_pages(
api_name="visits",
# params={"startInsertDate": get_previous_date(5), "endInsertDate": get_previous_date(0)}
params={}
)
save_json_to_pg(visits_data, API_ID)
# ❌ 2⃣ 报备数据(注释掉,后续可启用)
"""
reports_data = client.fetch_all_pages(
api_name="reports",
params={"startApplyDate": get_previous_date(5), "endApplyDate": get_previous_date(0)}
)
save_json_to_pg(reports_data, API_ID)
"""
# ❌ 3⃣ POS 数据(注释掉,后续可启用)
"""
pos_data = client.fetch_all_pages(
api_name="pos_datas",
params={"startPosInsertDate": get_previous_date(10), "endPosInsertDate": get_previous_date(0)}
)
save_json_to_pg(pos_data, API_ID)
"""
# ❌ 4⃣ 客户及联系人数据(注释掉,后续可启用)
"""
cust_contact_data = client.fetch_all_pages(
api_name="customer_and_contact_datas",
params={"customerId": 0, "customerContactId": 0}
)
save_json_to_pg(cust_contact_data, API_ID)
"""
if __name__ == "__main__":
main()