add workflow partner1site,dev

This commit is contained in:
root 2025-09-30 16:36:10 +08:00
parent c042a0330e
commit 0f07760461
2 changed files with 143 additions and 143 deletions

View File

@ -1,142 +1,142 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Partner1site 全接口抓取脚本
分页结束条件hasNext == False
"""
import random
import hmac
import hashlib
import base64
import requests
import json
import uuid
from datetime import datetime, timezone, timedelta
from typing import Dict, Any
import psycopg2
# ======= 配置区 =======
ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc"
SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
PG_DSN = dict(
database="dataops_db",
user="dbuser_dba",
password="EmBRxnmmjnE3",
host="124.221.232.219",
port="5432"
)
API_ID = "2460976d-00c1-47d9-84b2-33e66d68" # 外部传入 api_id占位符
# ======================
class Partner1SiteClient:
"""Partner1site API 客户端"""
def __init__(self, access_key: str, secret_key: str):
self.ak = access_key
self.sk = secret_key
@staticmethod
def urlsafe_b64encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode()
def gen_token(self, expire_sec: int = 600) -> str:
"""生成 API Token"""
random_num = str(random.randint(100000, 999999))
deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec
parm_str = f"{random_num}:{deadline}"
enc_parm = self.urlsafe_b64encode(parm_str.encode())
sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest()
enc_sign = self.urlsafe_b64encode(sign.hex().encode())
return f"{self.ak}:{enc_sign}:{enc_parm}"
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求结束条件hasNext == False"""
# if api_name not in BASE_URLS:
# raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = 'http://onesite.tek.cn/api/summary/visits'
all_data = []
page_num = 0
page_size = 1000 # 固定每页大小
while True:
token = self.gen_token()
params_with_paging = dict(params)
params_with_paging.update({
"token": token,
"size": page_size,
"page": page_num
})
resp = requests.get(base_url, params=params_with_paging, timeout=30)
resp.raise_for_status()
data_json = resp.json()
if data_json.get("code") != 100 or not data_json.get("success", False):
raise RuntimeError(f"获取客户拜访数据 API 错误: {data_json.get('message')}")
content = data_json.get("data", {}).get("content", [])
all_data.extend(content)
total_elements = data_json.get("data", {}).get("totalElements")
has_next = data_json.get("data", {}).get("hasNext", False)
print(f"[获取客户拜访数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
if not has_next:
break
page_num += 1
return all_data
def save_json_to_pg(data: list, api_id: str) -> None:
"""写入 PostgreSQL软删历史 + 插入新数据"""
print(f"[save_to_pg] API={api_id} 写入 PG记录数={len(data)}")
sql = """
UPDATE data_api.api_data
SET is_loaded = '1'
WHERE api_id = %s;
INSERT INTO data_api.api_data
(id, api_id, data, total_num, is_loaded, status,
request_tm, execute_tm, remark)
VALUES (%s, %s, %s, %s, '0', '0',
current_timestamp(0), current_timestamp(0), '');
"""
try:
with psycopg2.connect(**PG_DSN) as conn:
with conn.cursor() as cur:
cur.execute(sql,
(api_id,
str(uuid.uuid4()),
api_id,
json.dumps(data, ensure_ascii=False),
len(data)))
conn.commit()
print(f"[save_to_pg] API={api_id} 写入完成")
except Exception as e:
raise RuntimeError(f"PG写入错误: {e}")
def get_previous_date(days: int = 0) -> str:
return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
data = client.fetch_all_pages(
api_name="visits",
params={'startInsertDate':'2025-8-20',} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
)
save_json_to_pg(data, API_ID)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Partner1site 全接口抓取脚本
分页结束条件hasNext == False
"""
import random
import hmac
import hashlib
import base64
import requests
import json
import uuid
from datetime import datetime, timezone, timedelta
from typing import Dict, Any
import psycopg2
# ======= 配置区 =======
ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc"
SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
PG_DSN = dict(
database="dataops_db",
user="dbuser_dba",
password="EmBRxnmmjnE3",
host="124.221.232.219",
port="5432"
)
API_ID = "2460976d-00c1-47d9-84b2-33e66d68" # 外部传入 api_id占位符
# ======================
class Partner1SiteClient:
"""Partner1site API 客户端"""
def __init__(self, access_key: str, secret_key: str):
self.ak = access_key
self.sk = secret_key
@staticmethod
def urlsafe_b64encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode()
def gen_token(self, expire_sec: int = 600) -> str:
"""生成 API Token"""
random_num = str(random.randint(100000, 999999))
deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec
parm_str = f"{random_num}:{deadline}"
enc_parm = self.urlsafe_b64encode(parm_str.encode())
sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest()
enc_sign = self.urlsafe_b64encode(sign.hex().encode())
return f"{self.ak}:{enc_sign}:{enc_parm}"
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求结束条件hasNext == False"""
# if api_name not in BASE_URLS:
# raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = 'http://onesite.tek.cn/api/summary/visits'
all_data = []
page_num = 0
page_size = 1000 # 固定每页大小
while True:
token = self.gen_token()
params_with_paging = dict(params)
params_with_paging.update({
"token": token,
"size": page_size,
"page": page_num
})
resp = requests.get(base_url, params=params_with_paging, timeout=30)
resp.raise_for_status()
data_json = resp.json()
if data_json.get("code") != 100 or not data_json.get("success", False):
raise RuntimeError(f"获取客户拜访数据 API 错误: {data_json.get('message')}")
content = data_json.get("data", {}).get("content", [])
all_data.extend(content)
total_elements = data_json.get("data", {}).get("totalElements")
has_next = data_json.get("data", {}).get("hasNext", False)
print(f"[获取客户拜访数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
if not has_next:
break
page_num += 1
return all_data
def save_json_to_pg(data: list, api_id: str) -> None:
"""写入 PostgreSQL软删历史 + 插入新数据"""
print(f"[save_to_pg] API={api_id} 写入 PG记录数={len(data)}")
sql = """
UPDATE data_api.api_data
SET is_loaded = '1'
WHERE api_id = %s;
INSERT INTO data_api.api_data
(id, api_id, data, total_num, is_loaded, status,
request_tm, execute_tm, remark)
VALUES (%s, %s, %s, %s, '0', '0',
current_timestamp(0), current_timestamp(0), '');
"""
try:
with psycopg2.connect(**PG_DSN) as conn:
with conn.cursor() as cur:
cur.execute(sql,
(api_id,
str(uuid.uuid4()),
api_id,
json.dumps(data, ensure_ascii=False),
len(data)))
conn.commit()
print(f"[save_to_pg] API={api_id} 写入完成")
except Exception as e:
raise RuntimeError(f"PG写入错误: {e}")
def get_previous_date(days: int = 0) -> str:
return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
data = client.fetch_all_pages(
api_name="visits",
params={'startInsertDate':'2000-1-1'} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
)
save_json_to_pg(data, API_ID)
if __name__ == "__main__":
main()

View File

@ -132,7 +132,7 @@ def main():
data = client.fetch_all_pages(
api_name="reports",
params={"startInsertDate":"2000-01-01"} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
params={'startInsertDate':'2000-1-1'} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
)