From 34f4c09c232b5382638aa346296977aa9d15246b Mon Sep 17 00:00:00 2001 From: root Date: Tue, 2 Dec 2025 16:24:11 +0800 Subject: [PATCH] add workflow partner1site,dev --- .../S98_S_partner_summary_pos.sql | 16 + .../partner_summary_pos/sa_foreign_tables.sql | 4 + .../partner_summary_pos/sa_tables.sql | 16 + .../S98_S_partner_summary_visit.sql | 4 + .../sa_foreign_tables.sql | 1 + .../partner_summary_visit/sa_tables.sql | 4 + .../获取POS数据/part_summary_pos_feign.py | 16 +- .../获取POS数据/part_summary_pos_load.sql | 12 + .../part_summary_visit_feign.py | 285 +++++++++-------- .../part_summary_visit_load.sql | 3 + .../获取报备数据/part_summary_report_feign.py | 286 +++++++++--------- 11 files changed, 347 insertions(+), 300 deletions(-) diff --git a/dev/workflow/TK_Cust/partner1site/partner_summary_pos/S98_S_partner_summary_pos.sql b/dev/workflow/TK_Cust/partner1site/partner_summary_pos/S98_S_partner_summary_pos.sql index 0a50d1b..6fe841f 100644 --- a/dev/workflow/TK_Cust/partner1site/partner_summary_pos/S98_S_partner_summary_pos.sql +++ b/dev/workflow/TK_Cust/partner1site/partner_summary_pos/S98_S_partner_summary_pos.sql @@ -18,6 +18,7 @@ insert into p10_sa.S98_S_partner_summary_pos , contact_phone , contact_title , customer_chanel_name + , customer_id , customer_name , distributor_name , distributor_name2 @@ -25,12 +26,15 @@ insert into p10_sa.S98_S_partner_summary_pos , id , industry , invoice_number + , net_usd , online_name , online_or_offline , pos_insert_date , pos_invoice_date , pos_update_date , product_ap + , product_family_code + , product_family_name , product_qty , province , sales_name @@ -56,6 +60,7 @@ insert into p10_sa.S98_S_partner_summary_pos , contact_phone , contact_title , customer_chanel_name + , customer_id , customer_name , distributor_name , distributor_name2 @@ -63,12 +68,15 @@ insert into p10_sa.S98_S_partner_summary_pos , id , industry , invoice_number + , net_usd , online_name , online_or_offline , pos_insert_date , pos_invoice_date , pos_update_date , product_ap + , product_family_code + , product_family_name , product_qty , province , sales_name @@ -99,6 +107,7 @@ insert into p12_sfull.S98_S_partner_summary_pos , contact_phone , contact_title , customer_chanel_name + , customer_id , customer_name , distributor_name , distributor_name2 @@ -106,12 +115,15 @@ insert into p12_sfull.S98_S_partner_summary_pos , id , industry , invoice_number + , net_usd , online_name , online_or_offline , pos_insert_date , pos_invoice_date , pos_update_date , product_ap + , product_family_code + , product_family_name , product_qty , province , sales_name @@ -137,6 +149,7 @@ insert into p12_sfull.S98_S_partner_summary_pos , contact_phone , contact_title , customer_chanel_name + , customer_id , customer_name , distributor_name , distributor_name2 @@ -144,12 +157,15 @@ insert into p12_sfull.S98_S_partner_summary_pos , id , industry , invoice_number + , net_usd , online_name , online_or_offline , pos_insert_date , pos_invoice_date , pos_update_date , product_ap + , product_family_code + , product_family_name , product_qty , province , sales_name diff --git a/dev/workflow/TK_Cust/partner1site/partner_summary_pos/sa_foreign_tables.sql b/dev/workflow/TK_Cust/partner1site/partner_summary_pos/sa_foreign_tables.sql index a90fe5e..c849b2b 100644 --- a/dev/workflow/TK_Cust/partner1site/partner_summary_pos/sa_foreign_tables.sql +++ b/dev/workflow/TK_Cust/partner1site/partner_summary_pos/sa_foreign_tables.sql @@ -13,6 +13,7 @@ CREATE FOREIGN TABLE if not exists p00_tal.S98_S_partner_summary_pos ( , contact_phone TEXT , contact_title TEXT , customer_chanel_name TEXT + , customer_id TEXT , customer_name TEXT , distributor_name TEXT , distributor_name2 TEXT @@ -20,12 +21,15 @@ CREATE FOREIGN TABLE if not exists p00_tal.S98_S_partner_summary_pos ( , id TEXT , industry TEXT , invoice_number TEXT + , net_usd TEXT , online_name TEXT , online_or_offline TEXT , pos_insert_date TEXT , pos_invoice_date TEXT , pos_update_date TEXT , product_ap TEXT + , product_family_code TEXT + , product_family_name TEXT , product_qty TEXT , province TEXT , sales_name TEXT diff --git a/dev/workflow/TK_Cust/partner1site/partner_summary_pos/sa_tables.sql b/dev/workflow/TK_Cust/partner1site/partner_summary_pos/sa_tables.sql index 90bbdce..c3d47e2 100644 --- a/dev/workflow/TK_Cust/partner1site/partner_summary_pos/sa_tables.sql +++ b/dev/workflow/TK_Cust/partner1site/partner_summary_pos/sa_tables.sql @@ -13,6 +13,7 @@ create table if not exists p10_sa.S98_S_partner_summary_pos ( , contact_phone TEXT , contact_title TEXT , customer_chanel_name TEXT + , customer_id TEXT , customer_name TEXT , distributor_name TEXT , distributor_name2 TEXT @@ -20,12 +21,15 @@ create table if not exists p10_sa.S98_S_partner_summary_pos ( , id TEXT , industry TEXT , invoice_number TEXT + , net_usd TEXT , online_name TEXT , online_or_offline TEXT , pos_insert_date TEXT , pos_invoice_date TEXT , pos_update_date TEXT , product_ap TEXT + , product_family_code TEXT + , product_family_name TEXT , product_qty TEXT , province TEXT , sales_name TEXT @@ -52,6 +56,7 @@ create table if not exists p10_sa.S98_S_partner_summary_pos ( COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.contact_phone IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.contact_title IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.customer_chanel_name IS ''; + COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.customer_id IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.customer_name IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.distributor_name IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.distributor_name2 IS ''; @@ -59,12 +64,15 @@ create table if not exists p10_sa.S98_S_partner_summary_pos ( COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.id IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.industry IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.invoice_number IS ''; + COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.net_usd IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.online_name IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.online_or_offline IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.pos_insert_date IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.pos_invoice_date IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.pos_update_date IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.product_ap IS ''; + COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.product_family_code IS ''; + COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.product_family_name IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.product_qty IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.province IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_pos.sales_name IS ''; @@ -95,6 +103,7 @@ create table if not exists p12_sfull.S98_S_partner_summary_pos ( , contact_phone TEXT , contact_title TEXT , customer_chanel_name TEXT + , customer_id TEXT , customer_name TEXT , distributor_name TEXT , distributor_name2 TEXT @@ -102,12 +111,15 @@ create table if not exists p12_sfull.S98_S_partner_summary_pos ( , id TEXT , industry TEXT , invoice_number TEXT + , net_usd TEXT , online_name TEXT , online_or_offline TEXT , pos_insert_date TEXT , pos_invoice_date TEXT , pos_update_date TEXT , product_ap TEXT + , product_family_code TEXT + , product_family_name TEXT , product_qty TEXT , province TEXT , sales_name TEXT @@ -134,6 +146,7 @@ create table if not exists p12_sfull.S98_S_partner_summary_pos ( COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.contact_phone IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.contact_title IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.customer_chanel_name IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.customer_id IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.customer_name IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.distributor_name IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.distributor_name2 IS ''; @@ -141,12 +154,15 @@ create table if not exists p12_sfull.S98_S_partner_summary_pos ( COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.id IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.industry IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.invoice_number IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.net_usd IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.online_name IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.online_or_offline IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.pos_insert_date IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.pos_invoice_date IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.pos_update_date IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.product_ap IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.product_family_code IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.product_family_name IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.product_qty IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.province IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_pos.sales_name IS ''; diff --git a/dev/workflow/TK_Cust/partner1site/partner_summary_visit/S98_S_partner_summary_visit.sql b/dev/workflow/TK_Cust/partner1site/partner_summary_visit/S98_S_partner_summary_visit.sql index a6fdca8..884a5ba 100644 --- a/dev/workflow/TK_Cust/partner1site/partner_summary_visit/S98_S_partner_summary_visit.sql +++ b/dev/workflow/TK_Cust/partner1site/partner_summary_visit/S98_S_partner_summary_visit.sql @@ -33,6 +33,7 @@ insert into p10_sa.S98_S_partner_summary_visit , like_number , sub_industry , tsm_names_by_alias + , update_date , visit_remark , visitor , etl_tx_dt ) @@ -65,6 +66,7 @@ insert into p10_sa.S98_S_partner_summary_visit , like_number , sub_industry , tsm_names_by_alias + , update_date , visit_remark , visitor , etl_tx_dt @@ -102,6 +104,7 @@ insert into p12_sfull.S98_S_partner_summary_visit , like_number , sub_industry , tsm_names_by_alias + , update_date , visit_remark , visitor , etl_tx_dt ) @@ -134,6 +137,7 @@ insert into p12_sfull.S98_S_partner_summary_visit , like_number , sub_industry , tsm_names_by_alias + , update_date , visit_remark , visitor , etl_tx_dt diff --git a/dev/workflow/TK_Cust/partner1site/partner_summary_visit/sa_foreign_tables.sql b/dev/workflow/TK_Cust/partner1site/partner_summary_visit/sa_foreign_tables.sql index 2c68c8b..3dcfd7f 100644 --- a/dev/workflow/TK_Cust/partner1site/partner_summary_visit/sa_foreign_tables.sql +++ b/dev/workflow/TK_Cust/partner1site/partner_summary_visit/sa_foreign_tables.sql @@ -28,6 +28,7 @@ CREATE FOREIGN TABLE if not exists p00_tal.S98_S_partner_summary_visit ( , like_number TEXT , sub_industry TEXT , tsm_names_by_alias TEXT + , update_date TEXT , visit_remark TEXT , visitor TEXT , etl_tx_dt TIMESTAMP diff --git a/dev/workflow/TK_Cust/partner1site/partner_summary_visit/sa_tables.sql b/dev/workflow/TK_Cust/partner1site/partner_summary_visit/sa_tables.sql index c8dff5a..d74bc16 100644 --- a/dev/workflow/TK_Cust/partner1site/partner_summary_visit/sa_tables.sql +++ b/dev/workflow/TK_Cust/partner1site/partner_summary_visit/sa_tables.sql @@ -28,6 +28,7 @@ create table if not exists p10_sa.S98_S_partner_summary_visit ( , like_number TEXT , sub_industry TEXT , tsm_names_by_alias TEXT + , update_date TEXT , visit_remark TEXT , visitor TEXT , etl_tx_dt TIMESTAMP @@ -61,6 +62,7 @@ create table if not exists p10_sa.S98_S_partner_summary_visit ( COMMENT ON COLUMN p10_sa.S98_S_partner_summary_visit.like_number IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_visit.sub_industry IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_visit.tsm_names_by_alias IS ''; + COMMENT ON COLUMN p10_sa.S98_S_partner_summary_visit.update_date IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_visit.visit_remark IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_visit.visitor IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_visit.etl_tx_dt IS ''; @@ -98,6 +100,7 @@ create table if not exists p12_sfull.S98_S_partner_summary_visit ( , like_number TEXT , sub_industry TEXT , tsm_names_by_alias TEXT + , update_date TEXT , visit_remark TEXT , visitor TEXT , etl_tx_dt TIMESTAMP @@ -131,6 +134,7 @@ create table if not exists p12_sfull.S98_S_partner_summary_visit ( COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_visit.like_number IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_visit.sub_industry IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_visit.tsm_names_by_alias IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_visit.update_date IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_visit.visit_remark IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_visit.visitor IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_visit.etl_tx_dt IS ''; diff --git a/dev/workflow/TK_Cust/partner1site/获取POS数据/part_summary_pos_feign.py b/dev/workflow/TK_Cust/partner1site/获取POS数据/part_summary_pos_feign.py index 02b8930..3735db5 100644 --- a/dev/workflow/TK_Cust/partner1site/获取POS数据/part_summary_pos_feign.py +++ b/dev/workflow/TK_Cust/partner1site/获取POS数据/part_summary_pos_feign.py @@ -20,14 +20,6 @@ import psycopg2 ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" -# URL 用占位符(外部替换) -BASE_URLS = { - "visits": "http://onesite.tek.cn/api/summary/pos_datas", # 客户拜访数据 - "reports": "http://onesite.tek.cn/api/summary/pos_datas", # 报备数据 - "pos_datas": "http://onesite.tek.cn/api/summary/pos_datas", # POS数据 - "customer_and_contact_datas": "http://onesite.tek.cn/api/summary/pos_datas" # 客户及联系人数据 -} - PG_DSN = dict( database="dataops_db", user="dbuser_dba", @@ -80,7 +72,7 @@ class Partner1SiteClient: "page": page_num }) - resp = requests.get(base_url, params=params_with_paging, timeout=120) + resp = requests.get(base_url, params=params_with_paging, timeout=30) resp.raise_for_status() data_json = resp.json() @@ -139,10 +131,8 @@ def main(): client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) data = client.fetch_all_pages( - api_name="pos_datas", - params={'startPosInsertDate':get_previous_date(7)} - # params={'startPosInsertDate':'2000-1-1'} - ) + api_name="visits",2000-01-01 + params={'startPosUpdateDate':'2000-01-01',} # 拉全量,若有默认时间限制可改成 params={'startPosUpdateDate':get_previous_date(7),} save_json_to_pg(data, API_ID) diff --git a/dev/workflow/TK_Cust/partner1site/获取POS数据/part_summary_pos_load.sql b/dev/workflow/TK_Cust/partner1site/获取POS数据/part_summary_pos_load.sql index 37df49d..36cda67 100644 --- a/dev/workflow/TK_Cust/partner1site/获取POS数据/part_summary_pos_load.sql +++ b/dev/workflow/TK_Cust/partner1site/获取POS数据/part_summary_pos_load.sql @@ -19,6 +19,7 @@ insert into data_api.partner_summary_pos ( , contact_phone , contact_title , customer_chanel_name + , customer_id , customer_name , distributor_name , distributor_name2 @@ -26,12 +27,15 @@ insert into data_api.partner_summary_pos ( , id , industry , invoice_number + , net_usd , online_name , online_or_offline , pos_insert_date , pos_invoice_date , pos_update_date , product_ap + , product_family_code + , product_family_name , product_qty , province , sales_name @@ -58,6 +62,7 @@ select , case when trim(both from contact_phone)='' then null else contact_phone::text end contact_phone , case when trim(both from contact_title)='' then null else contact_title::text end contact_title , case when trim(both from customer_chanel_name)='' then null else customer_chanel_name::text end customer_chanel_name + , case when trim(both from customer_id)='' then null else customer_id::text end customer_id , case when trim(both from customer_name)='' then null else customer_name::text end customer_name , case when trim(both from distributor_name)='' then null else distributor_name::text end distributor_name , case when trim(both from distributor_name2)='' then null else distributor_name2::text end distributor_name2 @@ -65,12 +70,15 @@ select , case when trim(both from id)='' then null else id::text end id , case when trim(both from industry)='' then null else industry::text end industry , case when trim(both from invoice_number)='' then null else invoice_number::text end invoice_number + , case when trim(both from net_usd)='' then null else net_usd::text end net_usd , case when trim(both from online_name)='' then null else online_name::text end online_name , case when trim(both from online_or_offline)='' then null else online_or_offline::text end online_or_offline , case when trim(both from pos_insert_date)='' then null else pos_insert_date::text end pos_insert_date , case when trim(both from pos_invoice_date)='' then null else pos_invoice_date::text end pos_invoice_date , case when trim(both from pos_update_date)='' then null else pos_update_date::text end pos_update_date , case when trim(both from product_ap)='' then null else product_ap::text end product_ap + , case when trim(both from product_family_code)='' then null else product_family_code::text end product_family_code + , case when trim(both from product_family_name)='' then null else product_family_name::text end product_family_name , case when trim(both from product_qty)='' then null else product_qty::text end product_qty , case when trim(both from province)='' then null else province::text end province , case when trim(both from sales_name)='' then null else sales_name::text end sales_name @@ -97,6 +105,7 @@ select , (json_array_elements(data::json)::json->>'contactPhone') contact_phone , (json_array_elements(data::json)::json->>'contactTitle') contact_title , (json_array_elements(data::json)::json->>'customerChanelName') customer_chanel_name + , (json_array_elements(data::json)::json->>'customerId') customer_id , (json_array_elements(data::json)::json->>'customerName') customer_name , (json_array_elements(data::json)::json->>'distributorName') distributor_name , (json_array_elements(data::json)::json->>'distributorName2') distributor_name2 @@ -104,12 +113,15 @@ select , (json_array_elements(data::json)::json->>'id') id , (json_array_elements(data::json)::json->>'industry') industry , (json_array_elements(data::json)::json->>'invoiceNumber') invoice_number + , (json_array_elements(data::json)::json->>'netUsd') net_usd , (json_array_elements(data::json)::json->>'onlineName') online_name , (json_array_elements(data::json)::json->>'onlineOrOffline') online_or_offline , (json_array_elements(data::json)::json->>'posInsertDate') pos_insert_date , (json_array_elements(data::json)::json->>'posInvoiceDate') pos_invoice_date , (json_array_elements(data::json)::json->>'posUpdateDate') pos_update_date , (json_array_elements(data::json)::json->>'productAp') product_ap + , (json_array_elements(data::json)::json->>'productFamilyCode') product_family_code + , (json_array_elements(data::json)::json->>'productFamilyName') product_family_name , (json_array_elements(data::json)::json->>'productQty') product_qty , (json_array_elements(data::json)::json->>'province') province , (json_array_elements(data::json)::json->>'salesName') sales_name diff --git a/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py b/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py index c49652c..6581b0d 100644 --- a/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py +++ b/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py @@ -1,143 +1,142 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Partner1site 全接口抓取脚本 -分页结束条件:hasNext == False -""" - -import random -import hmac -import hashlib -import base64 -import requests -import json -import uuid -from datetime import datetime, timezone, timedelta -from typing import Dict, Any -import psycopg2 - -# ======= 配置区 ======= -ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" -SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" - -PG_DSN = dict( - database="dataops_db", - user="dbuser_dba", - password="EmBRxnmmjnE3", - host="124.221.232.219", - port="5432" -) - -API_ID = "2460976d-00c1-47d9-84b2-33e66d68" # 外部传入 api_id,占位符 -# ====================== - - -class Partner1SiteClient: - """Partner1site API 客户端""" - - def __init__(self, access_key: str, secret_key: str): - self.ak = access_key - self.sk = secret_key - - @staticmethod - def urlsafe_b64encode(data: bytes) -> str: - return base64.urlsafe_b64encode(data).decode() - - def gen_token(self, expire_sec: int = 600) -> str: - """生成 API Token""" - random_num = str(random.randint(100000, 999999)) - deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec - parm_str = f"{random_num}:{deadline}" - enc_parm = self.urlsafe_b64encode(parm_str.encode()) - sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() - enc_sign = self.urlsafe_b64encode(sign.hex().encode()) - return f"{self.ak}:{enc_sign}:{enc_parm}" - - def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): - """通用分页请求(结束条件:hasNext == False)""" - # if api_name not in BASE_URLS: - # raise ValueError(f"未知 API 数据来源: {api_name}") - - base_url = 'http://onesite.tek.cn/api/summary/visits' - all_data = [] - page_num = 0 - page_size = 1000 # 固定每页大小 - - while True: - token = self.gen_token() - params_with_paging = dict(params) - params_with_paging.update({ - "token": token, - "size": page_size, - "page": page_num - }) - - resp = requests.get(base_url, params=params_with_paging, timeout=30) - resp.raise_for_status() - data_json = resp.json() - - if data_json.get("code") != 100 or not data_json.get("success", False): - raise RuntimeError(f"获取客户拜访数据 API 错误: {data_json.get('message')}") - - content = data_json.get("data", {}).get("content", []) - all_data.extend(content) - total_elements = data_json.get("data", {}).get("totalElements") - has_next = data_json.get("data", {}).get("hasNext", False) - - print(f"[获取客户拜访数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}") - - if not has_next: - break - - page_num += 1 - - return all_data - - -def save_json_to_pg(data: list, api_id: str) -> None: - """写入 PostgreSQL:软删历史 + 插入新数据""" - print(f"[save_to_pg] API={api_id} 写入 PG,记录数={len(data)}") - sql = """ - UPDATE data_api.api_data - SET is_loaded = '1' - WHERE api_id = %s; - - INSERT INTO data_api.api_data - (id, api_id, data, total_num, is_loaded, status, - request_tm, execute_tm, remark) - VALUES (%s, %s, %s, %s, '0', '0', - current_timestamp(0), current_timestamp(0), ''); - """ - try: - with psycopg2.connect(**PG_DSN) as conn: - with conn.cursor() as cur: - cur.execute(sql, - (api_id, - str(uuid.uuid4()), - api_id, - json.dumps(data, ensure_ascii=False), - len(data))) - conn.commit() - print(f"[save_to_pg] API={api_id} 写入完成") - except Exception as e: - raise RuntimeError(f"PG写入错误: {e}") - - -def get_previous_date(days: int = 0) -> str: - return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") - - -def main(): - client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) - - data = client.fetch_all_pages( - api_name="visits", - # params={'startInsertDate':get_previous_date(7),} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)} - params={'startInsertDate':'2000-01-01',} - ) - save_json_to_pg(data, API_ID) - - - -if __name__ == "__main__": - main() +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Partner1site 全接口抓取脚本 +分页结束条件:hasNext == False +""" + +import random +import hmac +import hashlib +import base64 +import requests +import json +import uuid +from datetime import datetime, timezone, timedelta +from typing import Dict, Any +import psycopg2 + +# ======= 配置区 ======= +ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" +SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" + +PG_DSN = dict( + database="dataops_db", + user="dbuser_dba", + password="EmBRxnmmjnE3", + host="124.221.232.219", + port="5432" +) + +API_ID = "2460976d-00c1-47d9-84b2-33e66d68" # 外部传入 api_id,占位符 +# ====================== + + +class Partner1SiteClient: + """Partner1site API 客户端""" + + def __init__(self, access_key: str, secret_key: str): + self.ak = access_key + self.sk = secret_key + + @staticmethod + def urlsafe_b64encode(data: bytes) -> str: + return base64.urlsafe_b64encode(data).decode() + + def gen_token(self, expire_sec: int = 600) -> str: + """生成 API Token""" + random_num = str(random.randint(100000, 999999)) + deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec + parm_str = f"{random_num}:{deadline}" + enc_parm = self.urlsafe_b64encode(parm_str.encode()) + sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() + enc_sign = self.urlsafe_b64encode(sign.hex().encode()) + return f"{self.ak}:{enc_sign}:{enc_parm}" + + def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): + """通用分页请求(结束条件:hasNext == False)""" + # if api_name not in BASE_URLS: + # raise ValueError(f"未知 API 数据来源: {api_name}") + + base_url = 'http://onesite.tek.cn/api/summary/visits' + all_data = [] + page_num = 0 + page_size = 1000 # 固定每页大小 + + while True: + token = self.gen_token() + params_with_paging = dict(params) + params_with_paging.update({ + "token": token, + "size": page_size, + "page": page_num + }) + + resp = requests.get(base_url, params=params_with_paging, timeout=30) + resp.raise_for_status() + data_json = resp.json() + + if data_json.get("code") != 100 or not data_json.get("success", False): + raise RuntimeError(f"获取客户拜访数据 API 错误: {data_json.get('message')}") + + content = data_json.get("data", {}).get("content", []) + all_data.extend(content) + total_elements = data_json.get("data", {}).get("totalElements") + has_next = data_json.get("data", {}).get("hasNext", False) + + print(f"[获取客户拜访数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}") + + if not has_next: + break + + page_num += 1 + + return all_data + + +def save_json_to_pg(data: list, api_id: str) -> None: + """写入 PostgreSQL:软删历史 + 插入新数据""" + print(f"[save_to_pg] API={api_id} 写入 PG,记录数={len(data)}") + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute(sql, + (api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data))) + conn.commit() + print(f"[save_to_pg] API={api_id} 写入完成") + except Exception as e: + raise RuntimeError(f"PG写入错误: {e}") + + +def get_previous_date(days: int = 0) -> str: + return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + + +def main(): + client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) + + data = client.fetch_all_pages( + api_name="visits", + params={'startUpdateDate':'2025-8-20',} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)} + ) + save_json_to_pg(data, API_ID) + + + +if __name__ == "__main__": + main() diff --git a/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_load.sql b/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_load.sql index e71e239..33e2e46 100644 --- a/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_load.sql +++ b/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_load.sql @@ -34,6 +34,7 @@ insert into data_api.partner_summary_visit ( , like_number , sub_industry , tsm_names_by_alias + , update_date , visit_remark , visitor ,etl_tx_dt @@ -67,6 +68,7 @@ select , case when trim(both from like_number)='' then null else like_number::text end like_number , case when trim(both from sub_industry)='' then null else sub_industry::text end sub_industry , case when trim(both from tsm_names_by_alias)='' then null else tsm_names_by_alias::text end tsm_names_by_alias + , case when trim(both from update_date)='' then null else update_date::text end update_date , case when trim(both from visit_remark)='' then null else visit_remark::text end visit_remark , case when trim(both from visitor)='' then null else visitor::text end visitor ,etl_tx_dt @@ -100,6 +102,7 @@ select , (json_array_elements(data::json)::json->>'likeNumber') like_number , (json_array_elements(data::json)::json->>'subIndustry') sub_industry , (json_array_elements(data::json)::json->>'tsmNamesByAlias') tsm_names_by_alias + , (json_array_elements(data::json)::json->>'updateDate') update_date , (json_array_elements(data::json)::json->>'visitRemark') visit_remark , (json_array_elements(data::json)::json->>'visitor') visitor ,CURRENT_TIMESTAMP(0) etl_tx_dt diff --git a/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py b/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py index 202f402..8eb72bf 100644 --- a/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py +++ b/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py @@ -1,144 +1,142 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Partner1site 全接口抓取脚本 -分页结束条件:hasNext == False -""" - -import random -import hmac -import hashlib -import base64 -import requests -import json -import uuid -from datetime import datetime, timezone, timedelta -from typing import Dict, Any -import psycopg2 - -# ======= 配置区 ======= -ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" -SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" - -PG_DSN = dict( - database="dataops_db", - user="dbuser_dba", - password="EmBRxnmmjnE3", - host="124.221.232.219", - port="5432" -) - -API_ID = "89190c80-b241-4453-97ef-f0fbac2d" # 外部传入 api_id,占位符 -# ====================== - - -class Partner1SiteClient: - """Partner1site API 客户端""" - - def __init__(self, access_key: str, secret_key: str): - self.ak = access_key - self.sk = secret_key - - @staticmethod - def urlsafe_b64encode(data: bytes) -> str: - return base64.urlsafe_b64encode(data).decode() - - def gen_token(self, expire_sec: int = 600) -> str: - """生成 API Token""" - random_num = str(random.randint(100000, 999999)) - deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec - parm_str = f"{random_num}:{deadline}" - enc_parm = self.urlsafe_b64encode(parm_str.encode()) - sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() - enc_sign = self.urlsafe_b64encode(sign.hex().encode()) - return f"{self.ak}:{enc_sign}:{enc_parm}" - - def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): - """通用分页请求(结束条件:hasNext == False)""" - # if api_name not in BASE_URLS: - # raise ValueError(f"未知 API 数据来源: {api_name}") - - base_url = 'http://onesite.tek.cn/api/summary/reports' - all_data = [] - page_num = 0 - page_size = 1000 # 固定每页大小 - - while True: - token = self.gen_token() - params_with_paging = dict(params) - params_with_paging.update({ - "token": token, - "size": page_size, - "page": page_num - }) - - resp = requests.get(base_url, params=params_with_paging, timeout=30) - resp.raise_for_status() - data_json = resp.json() - - if data_json.get("code") != 100 or not data_json.get("success", False): - raise RuntimeError(f"获取报备数据 API 错误: {data_json.get('message')}") - - content = data_json.get("data", {}).get("content", []) - all_data.extend(content) - total_elements = data_json.get("data", {}).get("totalElements") - has_next = data_json.get("data", {}).get("hasNext", False) - - print(f"[获取报备数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}") - - if not has_next: - break - - page_num += 1 - - return all_data - - -def save_json_to_pg(data: list, api_id: str) -> None: - """写入 PostgreSQL:软删历史 + 插入新数据""" - print(f"[save_to_pg] API={api_id} 写入 PG,记录数={len(data)}") - sql = """ - UPDATE data_api.api_data - SET is_loaded = '1' - WHERE api_id = %s; - - INSERT INTO data_api.api_data - (id, api_id, data, total_num, is_loaded, status, - request_tm, execute_tm, remark) - VALUES (%s, %s, %s, %s, '0', '0', - current_timestamp(0), current_timestamp(0), ''); - """ - try: - with psycopg2.connect(**PG_DSN) as conn: - with conn.cursor() as cur: - cur.execute(sql, - (api_id, - str(uuid.uuid4()), - api_id, - json.dumps(data, ensure_ascii=False), - len(data))) - conn.commit() - print(f"[save_to_pg] API={api_id} 写入完成") - except Exception as e: - raise RuntimeError(f"PG写入错误: {e}") - - -def get_previous_date(days: int = 0) -> str: - return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") - - -def main(): - client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) - - data = client.fetch_all_pages( - api_name="visits", - #params={'startApplyDate':get_previous_date(7),} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)} - params={'startApplyDate':'2000-01-01',} - ) - - save_json_to_pg(data, API_ID) - - - -if __name__ == "__main__": - main() +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Partner1site 全接口抓取脚本 +分页结束条件:hasNext == False +""" + +import random +import hmac +import hashlib +import base64 +import requests +import json +import uuid +from datetime import datetime, timezone, timedelta +from typing import Dict, Any +import psycopg2 + +# ======= 配置区 ======= +ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" +SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" + +PG_DSN = dict( + database="dataops_db", + user="dbuser_dba", + password="EmBRxnmmjnE3", + host="124.221.232.219", + port="5432" +) + +API_ID = "89190c80-b241-4453-97ef-f0fbac2d" # 外部传入 api_id,占位符 +# ====================== + + +class Partner1SiteClient: + """Partner1site API 客户端""" + + def __init__(self, access_key: str, secret_key: str): + self.ak = access_key + self.sk = secret_key + + @staticmethod + def urlsafe_b64encode(data: bytes) -> str: + return base64.urlsafe_b64encode(data).decode() + + def gen_token(self, expire_sec: int = 600) -> str: + """生成 API Token""" + random_num = str(random.randint(100000, 999999)) + deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec + parm_str = f"{random_num}:{deadline}" + enc_parm = self.urlsafe_b64encode(parm_str.encode()) + sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() + enc_sign = self.urlsafe_b64encode(sign.hex().encode()) + return f"{self.ak}:{enc_sign}:{enc_parm}" + + def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): + """通用分页请求(结束条件:hasNext == False)""" + # if api_name not in BASE_URLS: + # raise ValueError(f"未知 API 数据来源: {api_name}") + + base_url = 'http://onesite.tek.cn/api/summary/reports' + all_data = [] + page_num = 0 + page_size = 1000 # 固定每页大小 + + while True: + token = self.gen_token() + params_with_paging = dict(params) + params_with_paging.update({ + "token": token, + "size": page_size, + "page": page_num + }) + + resp = requests.get(base_url, params=params_with_paging, timeout=30) + resp.raise_for_status() + data_json = resp.json() + + if data_json.get("code") != 100 or not data_json.get("success", False): + raise RuntimeError(f"获取报备数据 API 错误: {data_json.get('message')}") + + content = data_json.get("data", {}).get("content", []) + all_data.extend(content) + total_elements = data_json.get("data", {}).get("totalElements") + has_next = data_json.get("data", {}).get("hasNext", False) + + print(f"[获取报备数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}") + + if not has_next: + break + + page_num += 1 + + return all_data + + +def save_json_to_pg(data: list, api_id: str) -> None: + """写入 PostgreSQL:软删历史 + 插入新数据""" + print(f"[save_to_pg] API={api_id} 写入 PG,记录数={len(data)}") + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute(sql, + (api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data))) + conn.commit() + print(f"[save_to_pg] API={api_id} 写入完成") + except Exception as e: + raise RuntimeError(f"PG写入错误: {e}") + + +def get_previous_date(days: int = 0) -> str: + return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + + +def main(): + client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) + + data = client.fetch_all_pages( + api_name="visits", + params={'startUpdateDate':'2000-01-01',} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)} + ) + save_json_to_pg(data, API_ID) + + + +if __name__ == "__main__": + main()