diff --git a/dev/workflow/TK_Cust/partner1site/partner1site/wf_dag_partner1site.py b/dev/workflow/TK_Cust/partner1site/partner1site/wf_dag_partner1site.py new file mode 100644 index 0000000..7912798 --- /dev/null +++ b/dev/workflow/TK_Cust/partner1site/partner1site/wf_dag_partner1site.py @@ -0,0 +1,60 @@ +#!/usr/bin/python +# -*- encoding=utf-8 -*- +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.contrib.operators.ssh_operator import SSHOperator +from airflow.sensors.external_task_sensor import ExternalTaskSensor +import json + +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.operators.email_operator import EmailOperator +from airflow.utils.trigger_rule import TriggerRule + + +sshHook = SSHHook(ssh_conn_id ='ssh_air') +default_args = { +'owner': 'tek_newsletter@163.com', +'email_on_failure': True, +'email_on_retry':True, +'start_date': datetime(2024, 1, 1), +'depends_on_past': False, +'retries': 6, +'retry_delay': timedelta(minutes=10), +} + +dag = DAG('wf_dag_partner1site', default_args=default_args, +schedule_interval="0 1 * * *", +catchup=False, +dagrun_timeout=timedelta(minutes=160), +max_active_runs=3) + +task_failed = EmailOperator ( + dag=dag, + trigger_rule=TriggerRule.ONE_FAILED, + task_id="task_failed", + to=["tek_newsletter@163.com"], + cc=[""], + subject="partner1site_failed", + html_content='

您好,partner1site作业失败,请及时处理"

') + +part_summary_visit_feign = SSHOperator( +ssh_hook=sshHook, +task_id='part_summary_visit_feign', +command='python3 /data/airflow/etl/API/part_summary_visit_feign.py', +depends_on_past=False, +retries=3, +dag=dag) + +part_summary_visit_load = SSHOperator( +ssh_hook=sshHook, +task_id='part_summary_visit_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"part_summary_visit_load"}, +depends_on_past=False, +retries=3, +dag=dag) + +part_summary_visit_feign >> part_summary_visit_load + +part_summary_visit_load >> task_failed diff --git a/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py b/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py new file mode 100644 index 0000000..f65db1b --- /dev/null +++ b/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_feign.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +partner1site 多数据来源 API 请求模块 +支持自动分页,并存储到 PostgreSQL +""" + +import random +import hmac +import hashlib +import base64 +import requests +import json +import uuid +from datetime import datetime, timezone, timedelta +from typing import List, Dict, Any +import psycopg2 + +# ======= 配置区 ======= +ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc" +SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5" + +# URL 用占位符,外部替换 +BASE_URLS = { + "visits": "http://onesite.tek.cn/api/summary/visits", # 客户拜访数据 + # "reports": "http://onesite.tek.cn/api/summary/visits", # 报备数据 + # "pos_datas": "http://onesite.tek.cn/api/summary/visits", # POS数据 + # "customer_and_contact_datas": "http://onesite.tek.cn/api/summary/visits" # 客户及联系人数据 +} + +PG_DSN = dict( + database="dataops_db", + user="dbuser_dba", + password="EmBRxnmmjnE3", + host="124.221.232.219", + port="5432" +) +# ====================== + + +class Partner1SiteClient: + """ + partner1site API 客户端 + """ + + def __init__(self, access_key: str, secret_key: str): + self.ak = access_key + self.sk = secret_key + + @staticmethod + def urlsafe_b64encode(data: bytes) -> str: + """URL安全Base64编码""" + return base64.urlsafe_b64encode(data).decode() + + def gen_token(self, expire_sec: int = 600) -> str: + """生成 token —— 按最初调通的算法修复""" + random_num = str(random.randint(100000, 999999)) + deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec + parm_str = f"{random_num}:{deadline}" + # 参数字符串 URL 安全 Base64 编码 + enc_parm = self.urlsafe_b64encode(parm_str.encode()) + # HMAC-SHA1 签名(对 enc_parm 编码进行) + sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest() + # digest 转成十六进制字符串再 URL 安全 Base64 编码 + enc_sign = self.urlsafe_b64encode(sign.hex().encode()) + return f"{self.ak}:{enc_sign}:{enc_parm}" + + def fetch_all_pages(self, api_name: str, params: Dict[str, Any]): + """通用分页请求""" + if api_name not in BASE_URLS: + raise ValueError(f"未知 API 数据来源: {api_name}") + + base_url = BASE_URLS[api_name] + all_data = [] + page_num = 0 + page_size = 50 # 固定页容量 + + while True: + token = self.gen_token() + params_with_paging = dict(params) + params_with_paging.update({ + "token": token, + "size": page_size, + "page": page_num + }) + + resp = requests.get(base_url, params=params_with_paging, timeout=15) + resp.raise_for_status() + data_json = resp.json() + + if data_json.get("code") != 100 or not data_json.get("success", False): + raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}") + + content = data_json.get("data", {}).get("content", []) + all_data.extend(content) + print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)} 条") + + if data_json.get("data", {}).get("last", True): + break + + page_num += 1 + + return all_data + + +def save_json_to_pg(data: list, api_id: str) -> None: + """写入 PostgreSQL:软删历史 + 插入新数据""" + print(f"[save_to_pg] API={api_id} 写入 PG,记录数={len(data)}") + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute(sql, + (api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data))) + conn.commit() + print(f"[save_to_pg] API={api_id} 写入完成") + except Exception as e: + raise RuntimeError(f"PG写入错误: {e}") + + +def get_previous_date(days: int = 0) -> str: + """获取 N 天前的日期字符串""" + return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d") + + +def main(): + client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY) + + # 客户拜访数据 + visits_data = client.fetch_all_pages( + api_name="visits", + params={"startInsertDate": get_previous_date(5), "endInsertDate": get_previous_date(0)} + ) + save_json_to_pg(visits_data, "partner1site_visits") + """ + # 报备数据 + reports_data = client.fetch_all_pages( + api_name="reports", + params={"startApplyDate": get_previous_date(5), "endApplyDate": get_previous_date(0)} + ) + save_json_to_pg(reports_data, "partner1site_reports") + + # POS 数据 + pos_data = client.fetch_all_pages( + api_name="pos_datas", + params={"startPosInsertDate": get_previous_date(10), "endPosInsertDate": get_previous_date(0)} + ) + save_json_to_pg(pos_data, "partner1site_pos_datas") + + # 客户及联系人数据 + cust_contact_data = client.fetch_all_pages( + api_name="customer_and_contact_datas", + params={"customerId": 0, "customerContactId": 0} + ) + save_json_to_pg(cust_contact_data, "partner1site_cust_contact") + """ + +if __name__ == "__main__": + main() diff --git a/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_load.sql b/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_load.sql new file mode 100644 index 0000000..41d789b --- /dev/null +++ b/dev/workflow/TK_Cust/partner1site/获取客户拜访数据/part_summary_visit_load.sql @@ -0,0 +1,105 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.partner_summary_visit; + +insert into data_api.partner_summary_visit ( + alias + , area + , attachment_number + , comment_number + , contact_address + , contact_city + , contact_department + , contact_district + , contact_email + , contact_name + , contact_phone + , contact_province + , contact_ways + , customer_category + , customer_category_type + , customer_name + , dealer_name + , industry + , insert_date + , intention_product + , into_report + , like_name + , like_number + , sub_industry + , tsm_names_by_alias + , visit_remark + , visitor + ,etl_tx_dt +) +select + case when trim(both from alias)='' then null else alias::text end alias + , case when trim(both from area)='' then null else area::text end area + , case when trim(both from attachment_number)='' then null else attachment_number::text end attachment_number + , case when trim(both from comment_number)='' then null else comment_number::text end comment_number + , case when trim(both from contact_address)='' then null else contact_address::text end contact_address + , case when trim(both from contact_city)='' then null else contact_city::text end contact_city + , case when trim(both from contact_department)='' then null else contact_department::text end contact_department + , case when trim(both from contact_district)='' then null else contact_district::text end contact_district + , case when trim(both from contact_email)='' then null else contact_email::text end contact_email + , case when trim(both from contact_name)='' then null else contact_name::text end contact_name + , case when trim(both from contact_phone)='' then null else contact_phone::text end contact_phone + , case when trim(both from contact_province)='' then null else contact_province::text end contact_province + , case when trim(both from contact_ways)='' then null else contact_ways::text end contact_ways + , case when trim(both from customer_category)='' then null else customer_category::text end customer_category + , case when trim(both from customer_category_type)='' then null else customer_category_type::text end customer_category_type + , case when trim(both from customer_name)='' then null else customer_name::text end customer_name + , case when trim(both from dealer_name)='' then null else dealer_name::text end dealer_name + , case when trim(both from industry)='' then null else industry::text end industry + , case when trim(both from insert_date)='' then null else insert_date::text end insert_date + , case when trim(both from intention_product)='' then null else intention_product::text end intention_product + , case when trim(both from into_report)='' then null else into_report::text end into_report + , case when trim(both from like_name)='' then null else like_name::text end like_name + , case when trim(both from like_number)='' then null else like_number::text end like_number + , case when trim(both from sub_industry)='' then null else sub_industry::text end sub_industry + , case when trim(both from tsm_names_by_alias)='' then null else tsm_names_by_alias::text end tsm_names_by_alias + , case when trim(both from visit_remark)='' then null else visit_remark::text end visit_remark + , case when trim(both from visitor)='' then null else visitor::text end visitor +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'alias') alias + , (json_array_elements(data::json)::json->>'area') area + , (json_array_elements(data::json)::json->>'attachmentNumber') attachment_number + , (json_array_elements(data::json)::json->>'commentNumber') comment_number + , (json_array_elements(data::json)::json->>'contactAddress') contact_address + , (json_array_elements(data::json)::json->>'contactCity') contact_city + , (json_array_elements(data::json)::json->>'contactDepartment') contact_department + , (json_array_elements(data::json)::json->>'contactDistrict') contact_district + , (json_array_elements(data::json)::json->>'contactEmail') contact_email + , (json_array_elements(data::json)::json->>'contactName') contact_name + , (json_array_elements(data::json)::json->>'contactPhone') contact_phone + , (json_array_elements(data::json)::json->>'contactProvince') contact_province + , (json_array_elements(data::json)::json->>'contactWays') contact_ways + , (json_array_elements(data::json)::json->>'customerCategory') customer_category + , (json_array_elements(data::json)::json->>'customerCategoryType') customer_category_type + , (json_array_elements(data::json)::json->>'customerName') customer_name + , (json_array_elements(data::json)::json->>'dealerName') dealer_name + , (json_array_elements(data::json)::json->>'industry') industry + , (json_array_elements(data::json)::json->>'insertDate') insert_date + , (json_array_elements(data::json)::json->>'intentionProduct') intention_product + , (json_array_elements(data::json)::json->>'intoReport') into_report + , (json_array_elements(data::json)::json->>'likeName') like_name + , (json_array_elements(data::json)::json->>'likeNumber') like_number + , (json_array_elements(data::json)::json->>'subIndustry') sub_industry + , (json_array_elements(data::json)::json->>'tsmNamesByAlias') tsm_names_by_alias + , (json_array_elements(data::json)::json->>'visitRemark') visit_remark + , (json_array_elements(data::json)::json->>'visitor') visitor + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='2460976d-00c1-47d9-84b2-33e66d68' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='2460976d-00c1-47d9-84b2-33e66d68'; +\q \ No newline at end of file