From 684465c71f3a362541c89df1957d793fb7be073b Mon Sep 17 00:00:00 2001 From: root Date: Sun, 28 Sep 2025 15:07:55 +0800 Subject: [PATCH] add workflow partner1site,dev --- .../part_summary_visit_feign.py | 350 +++++++++--------- 1 file changed, 177 insertions(+), 173 deletions(-) diff --git a/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py b/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py index f65db1b..f57ad59 100644 --- a/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py +++ b/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py @@ -1,173 +1,177 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -partner1site 多数据来源 API 请求模块 -支持自动分页,并存储到 PostgreSQL -""" - -import random -import hmac -import hashlib -import base64 -import requests -import json -import uuid -from datetime import datetime, timezone, timedelta -from typing import List, Dict, Any -import psycopg2 - -# ======= 配置区 ======= -ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" -SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" - -# URL 用占位符,外部替换 -BASE_URLS = { - "visits": "http://onesite.tek.cn/api/summary/visits", # 客户拜访数据 - # "reports": "http://onesite.tek.cn/api/summary/visits", # 报备数据 - # "pos_datas": "http://onesite.tek.cn/api/summary/visits", # POS数据 - # "customer_and_contact_datas": "http://onesite.tek.cn/api/summary/visits" # 客户及联系人数据 -} - -PG_DSN = dict( - database="dataops_db", - user="dbuser_dba", - password="EmBRxnmmjnE3", - host="124.221.232.219", - port="5432" -) -# ====================== - - -class Partner1SiteClient: - """ - partner1site API 客户端 - """ - - def __init__(self, access_key: str, secret_key: str): - self.ak = access_key - self.sk = secret_key - - @staticmethod - def urlsafe_b64encode(data: bytes) -> str: - """URL安全Base64编码""" - return base64.urlsafe_b64encode(data).decode() - - def gen_token(self, expire_sec: int = 600) -> str: - """生成 token —— 按最初调通的算法修复""" - random_num = str(random.randint(100000, 999999)) - deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec - parm_str = f"{random_num}:{deadline}" - # 参数字符串 URL 安全 Base64 编码 - enc_parm = self.urlsafe_b64encode(parm_str.encode()) - # HMAC-SHA1 签名(对 enc_parm 编码进行) - sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() - # digest 转成十六进制字符串再 URL 安全 Base64 编码 - enc_sign = self.urlsafe_b64encode(sign.hex().encode()) - return f"{self.ak}:{enc_sign}:{enc_parm}" - - def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): - """通用分页请求""" - if api_name not in BASE_URLS: - raise ValueError(f"未知 API 数据来源: {api_name}") - - base_url = BASE_URLS[api_name] - all_data = [] - page_num = 0 - page_size = 50 # 固定页容量 - - while True: - token = self.gen_token() - params_with_paging = dict(params) - params_with_paging.update({ - "token": token, - "size": page_size, - "page": page_num - }) - - resp = requests.get(base_url, params=params_with_paging, timeout=15) - resp.raise_for_status() - data_json = resp.json() - - if data_json.get("code") != 100 or not data_json.get("success", False): - raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}") - - content = data_json.get("data", {}).get("content", []) - all_data.extend(content) - print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)} 条") - - if data_json.get("data", {}).get("last", True): - break - - page_num += 1 - - return all_data - - -def save_json_to_pg(data: list, api_id: str) -> None: - """写入 PostgreSQL:软删历史 + 插入新数据""" - print(f"[save_to_pg] API={api_id} 写入 PG,记录数={len(data)}") - sql = """ - UPDATE data_api.api_data - SET is_loaded = '1' - WHERE api_id = %s; - - INSERT INTO data_api.api_data - (id, api_id, data, total_num, is_loaded, status, - request_tm, execute_tm, remark) - VALUES (%s, %s, %s, %s, '0', '0', - current_timestamp(0), current_timestamp(0), ''); - """ - try: - with psycopg2.connect(**PG_DSN) as conn: - with conn.cursor() as cur: - cur.execute(sql, - (api_id, - str(uuid.uuid4()), - api_id, - json.dumps(data, ensure_ascii=False), - len(data))) - conn.commit() - print(f"[save_to_pg] API={api_id} 写入完成") - except Exception as e: - raise RuntimeError(f"PG写入错误: {e}") - - -def get_previous_date(days: int = 0) -> str: - """获取 N 天前的日期字符串""" - return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") - - -def main(): - client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) - - # 客户拜访数据 - visits_data = client.fetch_all_pages( - api_name="visits", - params={"startInsertDate": get_previous_date(5), "endInsertDate": get_previous_date(0)} - ) - save_json_to_pg(visits_data, "partner1site_visits") - """ - # 报备数据 - reports_data = client.fetch_all_pages( - api_name="reports", - params={"startApplyDate": get_previous_date(5), "endApplyDate": get_previous_date(0)} - ) - save_json_to_pg(reports_data, "partner1site_reports") - - # POS 数据 - pos_data = client.fetch_all_pages( - api_name="pos_datas", - params={"startPosInsertDate": get_previous_date(10), "endPosInsertDate": get_previous_date(0)} - ) - save_json_to_pg(pos_data, "partner1site_pos_datas") - - # 客户及联系人数据 - cust_contact_data = client.fetch_all_pages( - api_name="customer_and_contact_datas", - params={"customerId": 0, "customerContactId": 0} - ) - save_json_to_pg(cust_contact_data, "partner1site_cust_contact") - """ - -if __name__ == "__main__": - main() +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +partner1site 多数据来源 API 请求模块 +支持自动分页,并存储到 PostgreSQL +""" + +import random +import hmac +import hashlib +import base64 +import requests +import json +import uuid +from datetime import datetime, timezone, timedelta +from typing import List, Dict, Any +import psycopg2 + +# ======= 配置区 ======= +ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" +SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" + +# URL 用占位符,外部替换 +BASE_URLS = { + "visits": "http://onesite.tek.cn/api/summary/visits", # 客户拜访数据 + "reports": "http://onesite.tek.cn/api/summary/visits", # 报备数据 + "pos_datas": "http://onesite.tek.cn/api/summary/visits", # POS数据 + "customer_and_contact_datas": "http://onesite.tek.cn/api/summary/visits" # 客户及联系人数据 +} + +PG_DSN = dict( + database="dataops_db", + user="dbuser_dba", + password="EmBRxnmmjnE3", + host="124.221.232.219", + port="5432" +) + +API_ID = "2460976d-00c1-47d9-84b2-33e66d68" # 外部传入 api_id,占位符 +# ====================== + + +class Partner1SiteClient: + """ + partner1site API 客户端 + """ + + def __init__(self, access_key: str, secret_key: str): + self.ak = access_key + self.sk = secret_key + + @staticmethod + def urlsafe_b64encode(data: bytes) -> str: + return base64.urlsafe_b64encode(data).decode() + + def gen_token(self, expire_sec: int = 600) -> str: + """生成 token —— 按调通算法""" + random_num = str(random.randint(100000, 999999)) + deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec + parm_str = f"{random_num}:{deadline}" + enc_parm = self.urlsafe_b64encode(parm_str.encode()) + sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() + enc_sign = self.urlsafe_b64encode(sign.hex().encode()) + return f"{self.ak}:{enc_sign}:{enc_parm}" + + def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): + """通用分页请求""" + if api_name not in BASE_URLS: + raise ValueError(f"未知 API 数据来源: {api_name}") + + base_url = BASE_URLS[api_name] + all_data = [] + page_num = 0 + page_size = 50 # 固定页容量 + + while True: + token = self.gen_token() + params_with_paging = dict(params) + params_with_paging.update({ + "token": token, + "size": page_size, + "page": page_num + }) + + resp = requests.get(base_url, params=params_with_paging, timeout=15) + resp.raise_for_status() + data_json = resp.json() + + if data_json.get("code") != 100 or not data_json.get("success", False): + raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}") + + content = data_json.get("data", {}).get("content", []) + all_data.extend(content) + print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)} 条") + + if data_json.get("data", {}).get("last", True): + break + + page_num += 1 + + return all_data + + +def save_json_to_pg(data: list, api_id: str) -> None: + """写入 PostgreSQL:软删历史 + 插入新数据""" + print(f"[save_to_pg] API={api_id} 写入 PG,记录数={len(data)}") + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute(sql, + (api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data))) + conn.commit() + print(f"[save_to_pg] API={api_id} 写入完成") + except Exception as e: + raise RuntimeError(f"PG写入错误: {e}") + + +def get_previous_date(days: int = 0) -> str: + """获取 N 天前的日期字符串""" + return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + + +def main(): + client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) + + # ✅ 1️⃣ 客户拜访数据 + visits_data = client.fetch_all_pages( + api_name="visits", + params={"startInsertDate": get_previous_date(5), "endInsertDate": get_previous_date(0)} + ) + save_json_to_pg(visits_data, API_ID) + + # ❌ 2️⃣ 报备数据(注释掉,后续可启用) + """ + reports_data = client.fetch_all_pages( + api_name="reports", + params={"startApplyDate": get_previous_date(5), "endApplyDate": get_previous_date(0)} + ) + save_json_to_pg(reports_data, API_ID) + """ + + # ❌ 3️⃣ POS 数据(注释掉,后续可启用) + """ + pos_data = client.fetch_all_pages( + api_name="pos_datas", + params={"startPosInsertDate": get_previous_date(10), "endPosInsertDate": get_previous_date(0)} + ) + save_json_to_pg(pos_data, API_ID) + """ + + # ❌ 4️⃣ 客户及联系人数据(注释掉,后续可启用) + """ + cust_contact_data = client.fetch_all_pages( + api_name="customer_and_contact_datas", + params={"customerId": 0, "customerContactId": 0} + ) + save_json_to_pg(cust_contact_data, API_ID) + """ + + +if __name__ == "__main__": + main()