add workflow partner1site,dev

This commit is contained in:
root 2025-09-28 14:43:22 +08:00
parent 8866f8d9ec
commit 69dada94c3
3 changed files with 338 additions and 0 deletions

View File

@ -0,0 +1,60 @@
#!/usr/bin/python
# -*- encoding=utf-8 -*-
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import json
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
sshHook = SSHHook(ssh_conn_id ='ssh_air')
default_args = {
'owner': 'tek_newsletter@163.com',
'email_on_failure': True,
'email_on_retry':True,
'start_date': datetime(2024, 1, 1),
'depends_on_past': False,
'retries': 6,
'retry_delay': timedelta(minutes=10),
}
dag = DAG('wf_dag_partner1site', default_args=default_args,
schedule_interval="0 1 * * *",
catchup=False,
dagrun_timeout=timedelta(minutes=160),
max_active_runs=3)
task_failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="task_failed",
to=["tek_newsletter@163.com"],
cc=[""],
subject="partner1site_failed",
html_content='<h3>您好partner1site作业失败请及时处理" </h3>')
part_summary_visit_feign = SSHOperator(
ssh_hook=sshHook,
task_id='part_summary_visit_feign',
command='python3 /data/airflow/etl/API/part_summary_visit_feign.py',
depends_on_past=False,
retries=3,
dag=dag)
part_summary_visit_load = SSHOperator(
ssh_hook=sshHook,
task_id='part_summary_visit_load',
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
params={'my_param':"part_summary_visit_load"},
depends_on_past=False,
retries=3,
dag=dag)
part_summary_visit_feign >> part_summary_visit_load
part_summary_visit_load >> task_failed

View File

@ -0,0 +1,173 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
partner1site 多数据来源 API 请求模块
支持自动分页并存储到 PostgreSQL
"""
import random
import hmac
import hashlib
import base64
import requests
import json
import uuid
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any
import psycopg2
# ======= 配置区 =======
ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc"
SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
# URL 用占位符,外部替换
BASE_URLS = {
"visits": "http://onesite.tek.cn/api/summary/visits", # 客户拜访数据
# "reports": "http://onesite.tek.cn/api/summary/visits", # 报备数据
# "pos_datas": "http://onesite.tek.cn/api/summary/visits", # POS数据
# "customer_and_contact_datas": "http://onesite.tek.cn/api/summary/visits" # 客户及联系人数据
}
PG_DSN = dict(
database="dataops_db",
user="dbuser_dba",
password="EmBRxnmmjnE3",
host="124.221.232.219",
port="5432"
)
# ======================
class Partner1SiteClient:
"""
partner1site API 客户端
"""
def __init__(self, access_key: str, secret_key: str):
self.ak = access_key
self.sk = secret_key
@staticmethod
def urlsafe_b64encode(data: bytes) -> str:
"""URL安全Base64编码"""
return base64.urlsafe_b64encode(data).decode()
def gen_token(self, expire_sec: int = 600) -> str:
"""生成 token —— 按最初调通的算法修复"""
random_num = str(random.randint(100000, 999999))
deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec
parm_str = f"{random_num}:{deadline}"
# 参数字符串 URL 安全 Base64 编码
enc_parm = self.urlsafe_b64encode(parm_str.encode())
# HMAC-SHA1 签名(对 enc_parm 编码进行)
sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest()
# digest 转成十六进制字符串再 URL 安全 Base64 编码
enc_sign = self.urlsafe_b64encode(sign.hex().encode())
return f"{self.ak}:{enc_sign}:{enc_parm}"
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求"""
if api_name not in BASE_URLS:
raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = BASE_URLS[api_name]
all_data = []
page_num = 0
page_size = 50 # 固定页容量
while True:
token = self.gen_token()
params_with_paging = dict(params)
params_with_paging.update({
"token": token,
"size": page_size,
"page": page_num
})
resp = requests.get(base_url, params=params_with_paging, timeout=15)
resp.raise_for_status()
data_json = resp.json()
if data_json.get("code") != 100 or not data_json.get("success", False):
raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}")
content = data_json.get("data", {}).get("content", [])
all_data.extend(content)
print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)}")
if data_json.get("data", {}).get("last", True):
break
page_num += 1
return all_data
def save_json_to_pg(data: list, api_id: str) -> None:
"""写入 PostgreSQL软删历史 + 插入新数据"""
print(f"[save_to_pg] API={api_id} 写入 PG记录数={len(data)}")
sql = """
UPDATE data_api.api_data
SET is_loaded = '1'
WHERE api_id = %s;
INSERT INTO data_api.api_data
(id, api_id, data, total_num, is_loaded, status,
request_tm, execute_tm, remark)
VALUES (%s, %s, %s, %s, '0', '0',
current_timestamp(0), current_timestamp(0), '');
"""
try:
with psycopg2.connect(**PG_DSN) as conn:
with conn.cursor() as cur:
cur.execute(sql,
(api_id,
str(uuid.uuid4()),
api_id,
json.dumps(data, ensure_ascii=False),
len(data)))
conn.commit()
print(f"[save_to_pg] API={api_id} 写入完成")
except Exception as e:
raise RuntimeError(f"PG写入错误: {e}")
def get_previous_date(days: int = 0) -> str:
"""获取 N 天前的日期字符串"""
return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
# 客户拜访数据
visits_data = client.fetch_all_pages(
api_name="visits",
params={"startInsertDate": get_previous_date(5), "endInsertDate": get_previous_date(0)}
)
save_json_to_pg(visits_data, "partner1site_visits")
"""
# 报备数据
reports_data = client.fetch_all_pages(
api_name="reports",
params={"startApplyDate": get_previous_date(5), "endApplyDate": get_previous_date(0)}
)
save_json_to_pg(reports_data, "partner1site_reports")
# POS 数据
pos_data = client.fetch_all_pages(
api_name="pos_datas",
params={"startPosInsertDate": get_previous_date(10), "endPosInsertDate": get_previous_date(0)}
)
save_json_to_pg(pos_data, "partner1site_pos_datas")
# 客户及联系人数据
cust_contact_data = client.fetch_all_pages(
api_name="customer_and_contact_datas",
params={"customerId": 0, "customerContactId": 0}
)
save_json_to_pg(cust_contact_data, "partner1site_cust_contact")
"""
if __name__ == "__main__":
main()

View File

@ -0,0 +1,105 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.partner_summary_visit;
insert into data_api.partner_summary_visit (
alias
, area
, attachment_number
, comment_number
, contact_address
, contact_city
, contact_department
, contact_district
, contact_email
, contact_name
, contact_phone
, contact_province
, contact_ways
, customer_category
, customer_category_type
, customer_name
, dealer_name
, industry
, insert_date
, intention_product
, into_report
, like_name
, like_number
, sub_industry
, tsm_names_by_alias
, visit_remark
, visitor
,etl_tx_dt
)
select
case when trim(both from alias)='' then null else alias::text end alias
, case when trim(both from area)='' then null else area::text end area
, case when trim(both from attachment_number)='' then null else attachment_number::text end attachment_number
, case when trim(both from comment_number)='' then null else comment_number::text end comment_number
, case when trim(both from contact_address)='' then null else contact_address::text end contact_address
, case when trim(both from contact_city)='' then null else contact_city::text end contact_city
, case when trim(both from contact_department)='' then null else contact_department::text end contact_department
, case when trim(both from contact_district)='' then null else contact_district::text end contact_district
, case when trim(both from contact_email)='' then null else contact_email::text end contact_email
, case when trim(both from contact_name)='' then null else contact_name::text end contact_name
, case when trim(both from contact_phone)='' then null else contact_phone::text end contact_phone
, case when trim(both from contact_province)='' then null else contact_province::text end contact_province
, case when trim(both from contact_ways)='' then null else contact_ways::text end contact_ways
, case when trim(both from customer_category)='' then null else customer_category::text end customer_category
, case when trim(both from customer_category_type)='' then null else customer_category_type::text end customer_category_type
, case when trim(both from customer_name)='' then null else customer_name::text end customer_name
, case when trim(both from dealer_name)='' then null else dealer_name::text end dealer_name
, case when trim(both from industry)='' then null else industry::text end industry
, case when trim(both from insert_date)='' then null else insert_date::text end insert_date
, case when trim(both from intention_product)='' then null else intention_product::text end intention_product
, case when trim(both from into_report)='' then null else into_report::text end into_report
, case when trim(both from like_name)='' then null else like_name::text end like_name
, case when trim(both from like_number)='' then null else like_number::text end like_number
, case when trim(both from sub_industry)='' then null else sub_industry::text end sub_industry
, case when trim(both from tsm_names_by_alias)='' then null else tsm_names_by_alias::text end tsm_names_by_alias
, case when trim(both from visit_remark)='' then null else visit_remark::text end visit_remark
, case when trim(both from visitor)='' then null else visitor::text end visitor
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'alias') alias
, (json_array_elements(data::json)::json->>'area') area
, (json_array_elements(data::json)::json->>'attachmentNumber') attachment_number
, (json_array_elements(data::json)::json->>'commentNumber') comment_number
, (json_array_elements(data::json)::json->>'contactAddress') contact_address
, (json_array_elements(data::json)::json->>'contactCity') contact_city
, (json_array_elements(data::json)::json->>'contactDepartment') contact_department
, (json_array_elements(data::json)::json->>'contactDistrict') contact_district
, (json_array_elements(data::json)::json->>'contactEmail') contact_email
, (json_array_elements(data::json)::json->>'contactName') contact_name
, (json_array_elements(data::json)::json->>'contactPhone') contact_phone
, (json_array_elements(data::json)::json->>'contactProvince') contact_province
, (json_array_elements(data::json)::json->>'contactWays') contact_ways
, (json_array_elements(data::json)::json->>'customerCategory') customer_category
, (json_array_elements(data::json)::json->>'customerCategoryType') customer_category_type
, (json_array_elements(data::json)::json->>'customerName') customer_name
, (json_array_elements(data::json)::json->>'dealerName') dealer_name
, (json_array_elements(data::json)::json->>'industry') industry
, (json_array_elements(data::json)::json->>'insertDate') insert_date
, (json_array_elements(data::json)::json->>'intentionProduct') intention_product
, (json_array_elements(data::json)::json->>'intoReport') into_report
, (json_array_elements(data::json)::json->>'likeName') like_name
, (json_array_elements(data::json)::json->>'likeNumber') like_number
, (json_array_elements(data::json)::json->>'subIndustry') sub_industry
, (json_array_elements(data::json)::json->>'tsmNamesByAlias') tsm_names_by_alias
, (json_array_elements(data::json)::json->>'visitRemark') visit_remark
, (json_array_elements(data::json)::json->>'visitor') visitor
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='2460976d-00c1-47d9-84b2-33e66d68' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='2460976d-00c1-47d9-84b2-33e66d68';
\q