diff --git a/dev/workflow/TK_Cust/partner1site/Partner报备数据表/t01_partner_report.sql b/dev/workflow/TK_Cust/partner1site/Partner报备数据表/t01_partner_report.sql index bd39373..de27108 100644 --- a/dev/workflow/TK_Cust/partner1site/Partner报备数据表/t01_partner_report.sql +++ b/dev/workflow/TK_Cust/partner1site/Partner报备数据表/t01_partner_report.sql @@ -8,10 +8,11 @@ CREATE TABLE IF NOT EXISTS p20_pdm.t01_partner_report ( , apply_admin_name varchar(100) , apply_phone varchar(50) , apply_email varchar(100) - , apply_date date + , apply_date date , check_admin_name varchar(50) , check_status_str varchar(50) - , check_date date + , check_date date + , customer_id varchar(50) , customer_name varchar(200) , way varchar(50) , trader_name varchar(200) @@ -30,14 +31,14 @@ CREATE TABLE IF NOT EXISTS p20_pdm.t01_partner_report ( , contact_phone varchar(50) , funding_situation varchar(100) , apply_situation varchar(100) - , complete_date date + , complete_date date , feature varchar(500) , advantage varchar(500) , benefit varchar(500) , progress varchar(20) , lost_reason_type varchar(100) , lost_reason varchar(200) - , update_date date + , update_date date , source varchar(100) , leads_num varchar(100) , delay varchar(20) @@ -54,7 +55,7 @@ CREATE TABLE IF NOT EXISTS p20_pdm.t01_partner_report ( , list_price decimal(24,2) , count int4 , rp_total_price decimal(24,2) - , end_date date + , end_date date , tsm_names_by_alias varchar(200) , alias varchar(100) , remark varchar(1000) @@ -79,6 +80,7 @@ CREATE TABLE IF NOT EXISTS p20_pdm.t01_partner_report ( COMMENT ON COLUMN p20_pdm.t01_partner_report.check_admin_name IS '审核销售经理'; COMMENT ON COLUMN p20_pdm.t01_partner_report.check_status_str IS '审核状态'; COMMENT ON COLUMN p20_pdm.t01_partner_report.check_date IS '审核时间'; + COMMENT ON COLUMN p20_pdm.t01_partner_report.customer_id IS '客户ID'; COMMENT ON COLUMN p20_pdm.t01_partner_report.customer_name IS '客户名称'; COMMENT ON COLUMN p20_pdm.t01_partner_report.way IS '采购方式及渠道'; COMMENT ON COLUMN p20_pdm.t01_partner_report.trader_name IS '贸易商名称'; diff --git a/dev/workflow/TK_Cust/partner1site/Partner报备数据表/t01_partner_report_agi.sql b/dev/workflow/TK_Cust/partner1site/Partner报备数据表/t01_partner_report_agi.sql index e57f181..09d4b11 100644 --- a/dev/workflow/TK_Cust/partner1site/Partner报备数据表/t01_partner_report_agi.sql +++ b/dev/workflow/TK_Cust/partner1site/Partner报备数据表/t01_partner_report_agi.sql @@ -4,11 +4,11 @@ /*Brilliance stems from wisdoms. */ /*************Head Section**************************************************************************/ /*Script Use: Periodically load data to :t01_partner_report(Partner报备数据表) */ -/*Create Date:2025-09-30 19:03:29 */ +/*Create Date:2025-10-16 18:25:02 */ /*SDM Developed By: dev */ /*SDM Developed Date: 2025-09-29 */ /*SDM Checked By: dev */ -/*SDM Checked Date: 2025-09-30 */ +/*SDM Checked Date: 2025-10-16 */ /*Script Developed By: dev */ /*Script Checked By: dev */ /*Source table 1: p10_sa.s98_s_partner_summary_report */ @@ -53,6 +53,7 @@ INSERT INTO t01_partner_report_agi_CUR_I ( ,check_admin_name /*审核销售经理*/ ,check_status_str /*审核状态*/ ,check_date /*审核时间*/ + ,customer_id /*客户ID*/ ,customer_name /*客户名称*/ ,way /*采购方式及渠道*/ ,trader_name /*贸易商名称*/ @@ -119,6 +120,7 @@ SELECT ,COALESCE(TRIM(CAST(p0.check_admin_name AS varchar(50))),'') /*check_admin_name*/ ,COALESCE(TRIM(CAST(p0.check_status_str AS varchar(50))),'') /*check_status_str*/ ,p0.check_date::date /*check_date*/ + ,COALESCE(TRIM(CAST(p0.customer_id AS varchar(50))),'') /*customer_id*/ ,COALESCE(TRIM(CAST(p0.customer_name AS varchar(200))),'') /*customer_name*/ ,COALESCE(TRIM(CAST(p0.way AS varchar(50))),'') /*way*/ ,COALESCE(TRIM(CAST(p0.trader_name AS varchar(200))),'') /*trader_name*/ @@ -190,6 +192,7 @@ FROM p10_sa.s98_s_partner_summary_report p0 ,check_admin_name /*审核销售经理*/ ,check_status_str /*审核状态*/ ,check_date /*审核时间*/ + ,customer_id /*客户ID*/ ,customer_name /*客户名称*/ ,way /*采购方式及渠道*/ ,trader_name /*贸易商名称*/ @@ -257,6 +260,7 @@ FROM p10_sa.s98_s_partner_summary_report p0 ,P1.check_admin_name /*审核销售经理*/ ,P1.check_status_str /*审核状态*/ ,P1.check_date /*审核时间*/ + ,P1.customer_id /*客户ID*/ ,P1.customer_name /*客户名称*/ ,P1.way /*采购方式及渠道*/ ,P1.trader_name /*贸易商名称*/ @@ -324,6 +328,7 @@ ON P1.report_num = P2.report_num AND P1.check_admin_name = P2.check_admin_name AND P1.check_status_str = P2.check_status_str AND P1.check_date = P2.check_date + AND P1.customer_id = P2.customer_id AND P1.customer_name = P2.customer_name AND P1.way = P2.way AND P1.trader_name = P2.trader_name @@ -382,6 +387,7 @@ WHERE P2.report_num IS NULL OR P2.check_admin_name IS NULL OR P2.check_status_str IS NULL OR P2.check_date IS NULL + OR P2.customer_id IS NULL OR P2.customer_name IS NULL OR P2.way IS NULL OR P2.trader_name IS NULL @@ -443,6 +449,7 @@ WHERE P2.report_num IS NULL ,check_admin_name /*审核销售经理*/ ,check_status_str /*审核状态*/ ,check_date /*审核时间*/ + ,customer_id /*客户ID*/ ,customer_name /*客户名称*/ ,way /*采购方式及渠道*/ ,trader_name /*贸易商名称*/ @@ -510,6 +517,7 @@ SELECT ,P1.check_admin_name /*审核销售经理*/ ,P1.check_status_str /*审核状态*/ ,P1.check_date /*审核时间*/ + ,P1.customer_id /*客户ID*/ ,P1.customer_name /*客户名称*/ ,P1.way /*采购方式及渠道*/ ,P1.trader_name /*贸易商名称*/ @@ -579,6 +587,7 @@ DO UPDATE SET ,check_admin_name=excluded.check_admin_name ,check_status_str=excluded.check_status_str ,check_date=excluded.check_date + ,customer_id=excluded.customer_id ,customer_name=excluded.customer_name ,way=excluded.way ,trader_name=excluded.trader_name diff --git a/dev/workflow/TK_Cust/partner1site/partner_summary_report/S98_S_partner_summary_report.sql b/dev/workflow/TK_Cust/partner1site/partner_summary_report/S98_S_partner_summary_report.sql index 32407fb..29049b4 100644 --- a/dev/workflow/TK_Cust/partner1site/partner_summary_report/S98_S_partner_summary_report.sql +++ b/dev/workflow/TK_Cust/partner1site/partner_summary_report/S98_S_partner_summary_report.sql @@ -22,6 +22,7 @@ insert into p10_sa.S98_S_partner_summary_report , contact_name , contact_phone , count + , customer_id , customer_name , dealer_name , delay @@ -81,6 +82,7 @@ insert into p10_sa.S98_S_partner_summary_report , contact_name , contact_phone , count + , customer_id , customer_name , dealer_name , delay @@ -145,6 +147,7 @@ insert into p12_sfull.S98_S_partner_summary_report , contact_name , contact_phone , count + , customer_id , customer_name , dealer_name , delay @@ -204,6 +207,7 @@ insert into p12_sfull.S98_S_partner_summary_report , contact_name , contact_phone , count + , customer_id , customer_name , dealer_name , delay diff --git a/dev/workflow/TK_Cust/partner1site/partner_summary_report/sa_foreign_tables.sql b/dev/workflow/TK_Cust/partner1site/partner_summary_report/sa_foreign_tables.sql index 24258e2..b508698 100644 --- a/dev/workflow/TK_Cust/partner1site/partner_summary_report/sa_foreign_tables.sql +++ b/dev/workflow/TK_Cust/partner1site/partner_summary_report/sa_foreign_tables.sql @@ -17,6 +17,7 @@ CREATE FOREIGN TABLE if not exists p00_tal.S98_S_partner_summary_report ( , contact_name TEXT , contact_phone TEXT , count TEXT + , customer_id TEXT , customer_name TEXT , dealer_name TEXT , delay TEXT diff --git a/dev/workflow/TK_Cust/partner1site/partner_summary_report/sa_tables.sql b/dev/workflow/TK_Cust/partner1site/partner_summary_report/sa_tables.sql index 2ba46b5..1243702 100644 --- a/dev/workflow/TK_Cust/partner1site/partner_summary_report/sa_tables.sql +++ b/dev/workflow/TK_Cust/partner1site/partner_summary_report/sa_tables.sql @@ -17,6 +17,7 @@ create table if not exists p10_sa.S98_S_partner_summary_report ( , contact_name TEXT , contact_phone TEXT , count TEXT + , customer_id TEXT , customer_name TEXT , dealer_name TEXT , delay TEXT @@ -77,6 +78,7 @@ create table if not exists p10_sa.S98_S_partner_summary_report ( COMMENT ON COLUMN p10_sa.S98_S_partner_summary_report.contact_name IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_report.contact_phone IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_report.count IS ''; + COMMENT ON COLUMN p10_sa.S98_S_partner_summary_report.customer_id IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_report.customer_name IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_report.dealer_name IS ''; COMMENT ON COLUMN p10_sa.S98_S_partner_summary_report.delay IS ''; @@ -141,6 +143,7 @@ create table if not exists p12_sfull.S98_S_partner_summary_report ( , contact_name TEXT , contact_phone TEXT , count TEXT + , customer_id TEXT , customer_name TEXT , dealer_name TEXT , delay TEXT @@ -201,6 +204,7 @@ create table if not exists p12_sfull.S98_S_partner_summary_report ( COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_report.contact_name IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_report.contact_phone IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_report.count IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_report.customer_id IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_report.customer_name IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_report.dealer_name IS ''; COMMENT ON COLUMN p12_sfull.S98_S_partner_summary_report.delay IS ''; diff --git a/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py b/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py index a879231..86381b6 100644 --- a/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py +++ b/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py @@ -1,145 +1,142 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Partner1site 全接口抓取脚本 -分页结束条件:hasNext == False -""" - -import random -import hmac -import hashlib -import base64 -import requests -import json -import uuid -from datetime import datetime, timezone, timedelta -from typing import Dict, Any -import psycopg2 - -# ======= 配置区 ======= -ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" -SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" - -PG_DSN = dict( - database="dataops_db", - user="dbuser_dba", - password="EmBRxnmmjnE3", - host="124.221.232.219", - port="5432" -) - -API_ID = "89190c80-b241-4453-97ef-f0fbac2d" # 外部传入 api_id,占位符 -# ====================== - - -class Partner1SiteClient: - """Partner1site API 客户端""" - - def __init__(self, access_key: str, secret_key: str): - self.ak = access_key - self.sk = secret_key - - @staticmethod - def urlsafe_b64encode(data: bytes) -> str: - return base64.urlsafe_b64encode(data).decode() - - def gen_token(self, expire_sec: int = 600) -> str: - """生成 API Token""" - random_num = str(random.randint(100000, 999999)) - deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec - parm_str = f"{random_num}:{deadline}" - enc_parm = self.urlsafe_b64encode(parm_str.encode()) - sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() - enc_sign = self.urlsafe_b64encode(sign.hex().encode()) - return f"{self.ak}:{enc_sign}:{enc_parm}" - - def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): - """通用分页请求(结束条件:hasNext == False)""" - # if api_name not in BASE_URLS: - # raise ValueError(f"未知 API 数据来源: {api_name}") - - base_url = 'http://onesite.tek.cn/api/summary/reports' - all_data = [] - page_num = 0 - page_size = 1000 # 固定每页大小 - - while True: - token = self.gen_token() - params_with_paging = dict(params) - params_with_paging.update({ - "token": token, - "size": page_size, - "page": page_num - }) - - resp = requests.get(base_url, params=params_with_paging, timeout=120) - resp.raise_for_status() - data_json = resp.json() - - if data_json.get("code") != 100 or not data_json.get("success", False): - raise RuntimeError(f"获取报备数据 API 错误: {data_json.get('message')}") - - content = data_json.get("data", {}).get("content", []) - all_data.extend(content) - total_elements = data_json.get("data", {}).get("totalElements") - has_next = data_json.get("data", {}).get("hasNext", False) - - print(f"[获取报备数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}") - - if not has_next: - break - - page_num += 1 - - return all_data - - -def save_json_to_pg(data: list, api_id: str) -> None: - """写入 PostgreSQL:软删历史 + 插入新数据""" - print(f"[save_to_pg] API={api_id} 写入 PG,记录数={len(data)}") - sql = """ - UPDATE data_api.api_data - SET is_loaded = '1' - WHERE api_id = %s; - - INSERT INTO data_api.api_data - (id, api_id, data, total_num, is_loaded, status, - request_tm, execute_tm, remark) - VALUES (%s, %s, %s, %s, '0', '0', - current_timestamp(0), current_timestamp(0), ''); - """ - try: - with psycopg2.connect(**PG_DSN) as conn: - with conn.cursor() as cur: - cur.execute(sql, - (api_id, - str(uuid.uuid4()), - api_id, - json.dumps(data, ensure_ascii=False), - len(data))) - conn.commit() - print(f"[save_to_pg] API={api_id} 写入完成") - except Exception as e: - raise RuntimeError(f"PG写入错误: {e}") - - -def get_previous_date(days: int = 0) -> str: - return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") - - -def main(): - client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) - - data = client.fetch_all_pages( - api_name="reports", - params={'startApplyDate':get_previous_date(7)} - # params={'startApplyDate':'2000-1-1'} - ) - - - save_json_to_pg(data, API_ID) - - - -if __name__ == "__main__": - main() +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Partner1site 全接口抓取脚本 +分页结束条件:hasNext == False +""" + +import random +import hmac +import hashlib +import base64 +import requests +import json +import uuid +from datetime import datetime, timezone, timedelta +from typing import Dict, Any +import psycopg2 + +# ======= 配置区 ======= +ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" +SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" + +PG_DSN = dict( + database="dataops_db", + user="dbuser_dba", + password="EmBRxnmmjnE3", + host="124.221.232.219", + port="5432" +) + +API_ID = "89190c80-b241-4453-97ef-f0fbac2d" # 外部传入 api_id,占位符 +# ====================== + + +class Partner1SiteClient: + """Partner1site API 客户端""" + + def __init__(self, access_key: str, secret_key: str): + self.ak = access_key + self.sk = secret_key + + @staticmethod + def urlsafe_b64encode(data: bytes) -> str: + return base64.urlsafe_b64encode(data).decode() + + def gen_token(self, expire_sec: int = 600) -> str: + """生成 API Token""" + random_num = str(random.randint(100000, 999999)) + deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec + parm_str = f"{random_num}:{deadline}" + enc_parm = self.urlsafe_b64encode(parm_str.encode()) + sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() + enc_sign = self.urlsafe_b64encode(sign.hex().encode()) + return f"{self.ak}:{enc_sign}:{enc_parm}" + + def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): + """通用分页请求(结束条件:hasNext == False)""" + # if api_name not in BASE_URLS: + # raise ValueError(f"未知 API 数据来源: {api_name}") + + base_url = 'http://onesite.tek.cn/api/summary/reports' + all_data = [] + page_num = 0 + page_size = 1000 # 固定每页大小 + + while True: + token = self.gen_token() + params_with_paging = dict(params) + params_with_paging.update({ + "token": token, + "size": page_size, + "page": page_num + }) + + resp = requests.get(base_url, params=params_with_paging, timeout=30) + resp.raise_for_status() + data_json = resp.json() + + if data_json.get("code") != 100 or not data_json.get("success", False): + raise RuntimeError(f"获取报备数据 API 错误: {data_json.get('message')}") + + content = data_json.get("data", {}).get("content", []) + all_data.extend(content) + total_elements = data_json.get("data", {}).get("totalElements") + has_next = data_json.get("data", {}).get("hasNext", False) + + print(f"[获取报备数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}") + + if not has_next: + break + + page_num += 1 + + return all_data + + +def save_json_to_pg(data: list, api_id: str) -> None: + """写入 PostgreSQL:软删历史 + 插入新数据""" + print(f"[save_to_pg] API={api_id} 写入 PG,记录数={len(data)}") + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute(sql, + (api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data))) + conn.commit() + print(f"[save_to_pg] API={api_id} 写入完成") + except Exception as e: + raise RuntimeError(f"PG写入错误: {e}") + + +def get_previous_date(days: int = 0) -> str: + return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + + +def main(): + client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) + + data = client.fetch_all_pages( + api_name="visits", + params={'startApplyDate':'2000-01-01',} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)} + ) + save_json_to_pg(data, API_ID) + + + +if __name__ == "__main__": + main() diff --git a/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_load.sql b/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_load.sql index e8aeb2b..6f76db4 100644 --- a/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_load.sql +++ b/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_load.sql @@ -23,6 +23,7 @@ insert into data_api.partner_summary_report ( , contact_name , contact_phone , count + , customer_id , customer_name , dealer_name , delay @@ -83,6 +84,7 @@ select , case when trim(both from contact_name)='' then null else contact_name::text end contact_name , case when trim(both from contact_phone)='' then null else contact_phone::text end contact_phone , case when trim(both from count)='' then null else count::text end count + , case when trim(both from customer_id)='' then null else customer_id::text end customer_id , case when trim(both from customer_name)='' then null else customer_name::text end customer_name , case when trim(both from dealer_name)='' then null else dealer_name::text end dealer_name , case when trim(both from delay)='' then null else delay::text end delay @@ -143,6 +145,7 @@ select , (json_array_elements(data::json)::json->>'contactName') contact_name , (json_array_elements(data::json)::json->>'contactPhone') contact_phone , (json_array_elements(data::json)::json->>'count') count + , (json_array_elements(data::json)::json->>'customerId') customer_id , (json_array_elements(data::json)::json->>'customerName') customer_name , (json_array_elements(data::json)::json->>'dealerName') dealer_name , (json_array_elements(data::json)::json->>'delay') delay