diff --git a/dev/workflow/TK_Cust/partner1site/partner1site/wf_dag_partner1site.py b/dev/workflow/TK_Cust/partner1site/partner1site/wf_dag_partner1site.py index 1ff559a..ca79d6f 100644 --- a/dev/workflow/TK_Cust/partner1site/partner1site/wf_dag_partner1site.py +++ b/dev/workflow/TK_Cust/partner1site/partner1site/wf_dag_partner1site.py @@ -66,5 +66,25 @@ depends_on_past=False, retries=3, dag=dag) +part_summary_report_feign = SSHOperator( +ssh_hook=sshHook, +task_id='part_summary_report_feign', +command='python3 /data/airflow/etl/API/part_summary_report_feign.py', +depends_on_past=False, +retries=3, +dag=dag) + +part_summary_report_load = SSHOperator( +ssh_hook=sshHook, +task_id='part_summary_report_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"part_summary_report_load"}, +depends_on_past=False, +retries=3, +dag=dag) + +part_summary_report_feign >> part_summary_report_load + part_summary_visit_load >> partner_summary_visit_9060 partner_summary_visit_9060 >> task_failed +part_summary_report_load >> task_failed diff --git a/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py b/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py new file mode 100644 index 0000000..e125f8f --- /dev/null +++ b/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_feign.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Partner1site 全接口抓取脚本 +分页结束条件:hasNext == False +""" + +import random +import hmac +import hashlib +import base64 +import requests +import json +import uuid +from datetime import datetime, timezone, timedelta +from typing import Dict, Any +import psycopg2 + +# ======= 配置区 ======= +ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" +SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" + +# URL 用占位符(外部替换) +BASE_URLS = { + # "visits": "http://onesite.tek.cn/api/summary/reports", # 客户拜访数据 + "reports": "http://onesite.tek.cn/api/summary/reports" # 报备数据 + # "pos_datas": "http://onesite.tek.cn/api/summary/reports", # POS数据 + # "customer_and_contact_datas": "http://onesite.tek.cn/api/summary/reports" # 客户及联系人数据 +} + +PG_DSN = dict( + database="dataops_db", + user="dbuser_dba", + password="EmBRxnmmjnE3", + host="124.221.232.219", + port="5432" +) + +API_ID = "89190c80-b241-4453-97ef-f0fbac2d" # 外部传入 api_id,占位符 +# ====================== + + +class Partner1SiteClient: + """Partner1site API 客户端""" + + def __init__(self, access_key: str, secret_key: str): + self.ak = access_key + self.sk = secret_key + + @staticmethod + def urlsafe_b64encode(data: bytes) -> str: + return base64.urlsafe_b64encode(data).decode() + + def gen_token(self, expire_sec: int = 600) -> str: + """生成 API Token""" + random_num = str(random.randint(100000, 999999)) + deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec + parm_str = f"{random_num}:{deadline}" + enc_parm = self.urlsafe_b64encode(parm_str.encode()) + sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() + enc_sign = self.urlsafe_b64encode(sign.hex().encode()) + return f"{self.ak}:{enc_sign}:{enc_parm}" + + def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): + """通用分页请求(结束条件:hasNext == False)""" + if api_name not in BASE_URLS: + raise ValueError(f"未知 API 数据来源: {api_name}") + + base_url = BASE_URLS[api_name] + all_data = [] + page_num = 0 + page_size = 50 # 固定每页大小 + + while True: + token = self.gen_token() + params_with_paging = dict(params) + params_with_paging.update({ + "token": token, + "size": page_size, + "page": page_num + }) + + resp = requests.get(base_url, params=params_with_paging, timeout=30) + resp.raise_for_status() + data_json = resp.json() + + if data_json.get("code") != 100 or not data_json.get("success", False): + raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}") + + content = data_json.get("data", {}).get("content", []) + all_data.extend(content) + total_elements = data_json.get("data", {}).get("totalElements") + has_next = data_json.get("data", {}).get("hasNext", False) + + print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}") + + if not has_next: + break + + page_num += 1 + + return all_data + + +def save_json_to_pg(data: list, api_id: str) -> None: + """写入 PostgreSQL:软删历史 + 插入新数据""" + print(f"[save_to_pg] API={api_id} 写入 PG,记录数={len(data)}") + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute(sql, + (api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data))) + conn.commit() + print(f"[save_to_pg] API={api_id} 写入完成") + except Exception as e: + raise RuntimeError(f"PG写入错误: {e}") + + +def get_previous_date(days: int = 0) -> str: + return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + + +def main(): + client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) + + # ✅ 客户拜访数据(执行) + """ + reports_data = client.fetch_all_pages( + api_name="reports", + params={} + ) + save_json_to_pg(reports_data, API_ID) + + # ❌ POS 数据(暂时注释) + """ + visits_data = client.fetch_all_pages( + api_name="visits", + params={} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)} + ) + save_json_to_pg(visits_data, API_ID) + """ + + + # ❌ POS 数据(暂时注释) + """ + pos_data = client.fetch_all_pages( + api_name="pos_datas", + params={} + ) + save_json_to_pg(pos_data, API_ID) + """ + + # ❌ 客户及联系人数据(暂时注释) + """ + cust_contact_data = client.fetch_all_pages( + api_name="customer_and_contact_datas", + params={} + ) + save_json_to_pg(cust_contact_data, API_ID) + """ + + +if __name__ == "__main__": + main() diff --git a/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_load.sql b/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_load.sql new file mode 100644 index 0000000..0265080 --- /dev/null +++ b/dev/workflow/TK_Cust/partner1site/获取报备数据/part_summary_report_load.sql @@ -0,0 +1,195 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.partner_summary_report; + +insert into data_api.partner_summary_report ( + accepted + , address + , advantage + , alias + , apply_admin_name + , apply_date + , apply_email + , apply_phone + , apply_situation + , benefit + , check_admin_name + , check_date + , check_status_str + , city + , complete_date + , contact_name + , contact_phone + , count + , customer_name + , dealer_name + , delay + , delay_reason + , department + , department_attr + , district + , end_date + , feature + , funding_situation + , industry + , integrated_sales + , leads_num + , list_price + , lost_reason + , lost_reason_type + , other_remark + , package_name + , package_type_name + , product_name + , product_sku_type + , product_type + , progress + , province + , region_name + , remark + , report_num + , rp_total_price + , series_name + , skus + , source + , sub_industry + , total_price + , trader_contact_name + , trader_contact_phone + , trader_name + , tsm_names_by_alias + , update_date + , way + ,etl_tx_dt +) +select + case when trim(both from accepted)='' then null else accepted::text end accepted + , case when trim(both from address)='' then null else address::text end address + , case when trim(both from advantage)='' then null else advantage::text end advantage + , case when trim(both from alias)='' then null else alias::text end alias + , case when trim(both from apply_admin_name)='' then null else apply_admin_name::text end apply_admin_name + , case when trim(both from apply_date)='' then null else apply_date::text end apply_date + , case when trim(both from apply_email)='' then null else apply_email::text end apply_email + , case when trim(both from apply_phone)='' then null else apply_phone::text end apply_phone + , case when trim(both from apply_situation)='' then null else apply_situation::text end apply_situation + , case when trim(both from benefit)='' then null else benefit::text end benefit + , case when trim(both from check_admin_name)='' then null else check_admin_name::text end check_admin_name + , case when trim(both from check_date)='' then null else check_date::text end check_date + , case when trim(both from check_status_str)='' then null else check_status_str::text end check_status_str + , case when trim(both from city)='' then null else city::text end city + , case when trim(both from complete_date)='' then null else complete_date::text end complete_date + , case when trim(both from contact_name)='' then null else contact_name::text end contact_name + , case when trim(both from contact_phone)='' then null else contact_phone::text end contact_phone + , case when trim(both from count)='' then null else count::text end count + , case when trim(both from customer_name)='' then null else customer_name::text end customer_name + , case when trim(both from dealer_name)='' then null else dealer_name::text end dealer_name + , case when trim(both from delay)='' then null else delay::text end delay + , case when trim(both from delay_reason)='' then null else delay_reason::text end delay_reason + , case when trim(both from department)='' then null else department::text end department + , case when trim(both from department_attr)='' then null else department_attr::text end department_attr + , case when trim(both from district)='' then null else district::text end district + , case when trim(both from end_date)='' then null else end_date::text end end_date + , case when trim(both from feature)='' then null else feature::text end feature + , case when trim(both from funding_situation)='' then null else funding_situation::text end funding_situation + , case when trim(both from industry)='' then null else industry::text end industry + , case when trim(both from integrated_sales)='' then null else integrated_sales::text end integrated_sales + , case when trim(both from leads_num)='' then null else leads_num::text end leads_num + , case when trim(both from list_price)='' then null else list_price::text end list_price + , case when trim(both from lost_reason)='' then null else lost_reason::text end lost_reason + , case when trim(both from lost_reason_type)='' then null else lost_reason_type::text end lost_reason_type + , case when trim(both from other_remark)='' then null else other_remark::text end other_remark + , case when trim(both from package_name)='' then null else package_name::text end package_name + , case when trim(both from package_type_name)='' then null else package_type_name::text end package_type_name + , case when trim(both from product_name)='' then null else product_name::text end product_name + , case when trim(both from product_sku_type)='' then null else product_sku_type::text end product_sku_type + , case when trim(both from product_type)='' then null else product_type::text end product_type + , case when trim(both from progress)='' then null else progress::text end progress + , case when trim(both from province)='' then null else province::text end province + , case when trim(both from region_name)='' then null else region_name::text end region_name + , case when trim(both from remark)='' then null else remark::text end remark + , case when trim(both from report_num)='' then null else report_num::text end report_num + , case when trim(both from rp_total_price)='' then null else rp_total_price::text end rp_total_price + , case when trim(both from series_name)='' then null else series_name::text end series_name + , case when trim(both from skus)='' then null else skus::text end skus + , case when trim(both from source)='' then null else source::text end source + , case when trim(both from sub_industry)='' then null else sub_industry::text end sub_industry + , case when trim(both from total_price)='' then null else total_price::text end total_price + , case when trim(both from trader_contact_name)='' then null else trader_contact_name::text end trader_contact_name + , case when trim(both from trader_contact_phone)='' then null else trader_contact_phone::text end trader_contact_phone + , case when trim(both from trader_name)='' then null else trader_name::text end trader_name + , case when trim(both from tsm_names_by_alias)='' then null else tsm_names_by_alias::text end tsm_names_by_alias + , case when trim(both from update_date)='' then null else update_date::text end update_date + , case when trim(both from way)='' then null else way::text end way +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'accepted') accepted + , (json_array_elements(data::json)::json->>'address') address + , (json_array_elements(data::json)::json->>'advantage') advantage + , (json_array_elements(data::json)::json->>'alias') alias + , (json_array_elements(data::json)::json->>'applyAdminName') apply_admin_name + , (json_array_elements(data::json)::json->>'applyDate') apply_date + , (json_array_elements(data::json)::json->>'applyEmail') apply_email + , (json_array_elements(data::json)::json->>'applyPhone') apply_phone + , (json_array_elements(data::json)::json->>'applySituation') apply_situation + , (json_array_elements(data::json)::json->>'benefit') benefit + , (json_array_elements(data::json)::json->>'checkAdminName') check_admin_name + , (json_array_elements(data::json)::json->>'checkDate') check_date + , (json_array_elements(data::json)::json->>'checkStatusStr') check_status_str + , (json_array_elements(data::json)::json->>'city') city + , (json_array_elements(data::json)::json->>'completeDate') complete_date + , (json_array_elements(data::json)::json->>'contactName') contact_name + , (json_array_elements(data::json)::json->>'contactPhone') contact_phone + , (json_array_elements(data::json)::json->>'count') count + , (json_array_elements(data::json)::json->>'customerName') customer_name + , (json_array_elements(data::json)::json->>'dealerName') dealer_name + , (json_array_elements(data::json)::json->>'delay') delay + , (json_array_elements(data::json)::json->>'delayReason') delay_reason + , (json_array_elements(data::json)::json->>'department') department + , (json_array_elements(data::json)::json->>'departmentAttr') department_attr + , (json_array_elements(data::json)::json->>'district') district + , (json_array_elements(data::json)::json->>'endDate') end_date + , (json_array_elements(data::json)::json->>'feature') feature + , (json_array_elements(data::json)::json->>'fundingSituation') funding_situation + , (json_array_elements(data::json)::json->>'industry') industry + , (json_array_elements(data::json)::json->>'integratedSales') integrated_sales + , (json_array_elements(data::json)::json->>'leadsNum') leads_num + , (json_array_elements(data::json)::json->>'listPrice') list_price + , (json_array_elements(data::json)::json->>'lostReason') lost_reason + , (json_array_elements(data::json)::json->>'lostReasonType') lost_reason_type + , (json_array_elements(data::json)::json->>'otherRemark') other_remark + , (json_array_elements(data::json)::json->>'packageName') package_name + , (json_array_elements(data::json)::json->>'packageTypeName') package_type_name + , (json_array_elements(data::json)::json->>'productName') product_name + , (json_array_elements(data::json)::json->>'productSkuType') product_sku_type + , (json_array_elements(data::json)::json->>'productType') product_type + , (json_array_elements(data::json)::json->>'progress') progress + , (json_array_elements(data::json)::json->>'province') province + , (json_array_elements(data::json)::json->>'regionName') region_name + , (json_array_elements(data::json)::json->>'remark') remark + , (json_array_elements(data::json)::json->>'reportNum') report_num + , (json_array_elements(data::json)::json->>'rpTotalPrice') rp_total_price + , (json_array_elements(data::json)::json->>'seriesName') series_name + , (json_array_elements(data::json)::json->>'skus') skus + , (json_array_elements(data::json)::json->>'source') source + , (json_array_elements(data::json)::json->>'subIndustry') sub_industry + , (json_array_elements(data::json)::json->>'totalPrice') total_price + , (json_array_elements(data::json)::json->>'traderContactName') trader_contact_name + , (json_array_elements(data::json)::json->>'traderContactPhone') trader_contact_phone + , (json_array_elements(data::json)::json->>'traderName') trader_name + , (json_array_elements(data::json)::json->>'tsmNamesByAlias') tsm_names_by_alias + , (json_array_elements(data::json)::json->>'updateDate') update_date + , (json_array_elements(data::json)::json->>'way') way + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='89190c80-b241-4453-97ef-f0fbac2d' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='89190c80-b241-4453-97ef-f0fbac2d'; +\q \ No newline at end of file