add workflow partner1site,dev

This commit is contained in:
root 2025-09-28 16:08:39 +08:00
parent 84c46d9d77
commit 51d4e66cad
3 changed files with 394 additions and 0 deletions

View File

@ -66,5 +66,25 @@ depends_on_past=False,
retries=3,
dag=dag)
part_summary_report_feign = SSHOperator(
ssh_hook=sshHook,
task_id='part_summary_report_feign',
command='python3 /data/airflow/etl/API/part_summary_report_feign.py',
depends_on_past=False,
retries=3,
dag=dag)
part_summary_report_load = SSHOperator(
ssh_hook=sshHook,
task_id='part_summary_report_load',
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
params={'my_param':"part_summary_report_load"},
depends_on_past=False,
retries=3,
dag=dag)
part_summary_report_feign >> part_summary_report_load
part_summary_visit_load >> partner_summary_visit_9060
partner_summary_visit_9060 >> task_failed
part_summary_report_load >> task_failed

View File

@ -0,0 +1,179 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Partner1site 全接口抓取脚本
分页结束条件hasNext == False
"""
import random
import hmac
import hashlib
import base64
import requests
import json
import uuid
from datetime import datetime, timezone, timedelta
from typing import Dict, Any
import psycopg2
# ======= 配置区 =======
ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc"
SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
# URL 用占位符(外部替换)
BASE_URLS = {
# "visits": "http://onesite.tek.cn/api/summary/reports", # 客户拜访数据
"reports": "http://onesite.tek.cn/api/summary/reports" # 报备数据
# "pos_datas": "http://onesite.tek.cn/api/summary/reports", # POS数据
# "customer_and_contact_datas": "http://onesite.tek.cn/api/summary/reports" # 客户及联系人数据
}
PG_DSN = dict(
database="dataops_db",
user="dbuser_dba",
password="EmBRxnmmjnE3",
host="124.221.232.219",
port="5432"
)
API_ID = "89190c80-b241-4453-97ef-f0fbac2d" # 外部传入 api_id占位符
# ======================
class Partner1SiteClient:
"""Partner1site API 客户端"""
def __init__(self, access_key: str, secret_key: str):
self.ak = access_key
self.sk = secret_key
@staticmethod
def urlsafe_b64encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode()
def gen_token(self, expire_sec: int = 600) -> str:
"""生成 API Token"""
random_num = str(random.randint(100000, 999999))
deadline = int(datetime.now(timezone.utc).timestamp()) + expire_sec
parm_str = f"{random_num}:{deadline}"
enc_parm = self.urlsafe_b64encode(parm_str.encode())
sign = hmac.new(self.sk.encode(), enc_parm.encode(), hashlib.sha1).digest()
enc_sign = self.urlsafe_b64encode(sign.hex().encode())
return f"{self.ak}:{enc_sign}:{enc_parm}"
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求结束条件hasNext == False"""
if api_name not in BASE_URLS:
raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = BASE_URLS[api_name]
all_data = []
page_num = 0
page_size = 50 # 固定每页大小
while True:
token = self.gen_token()
params_with_paging = dict(params)
params_with_paging.update({
"token": token,
"size": page_size,
"page": page_num
})
resp = requests.get(base_url, params=params_with_paging, timeout=30)
resp.raise_for_status()
data_json = resp.json()
if data_json.get("code") != 100 or not data_json.get("success", False):
raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}")
content = data_json.get("data", {}).get("content", [])
all_data.extend(content)
total_elements = data_json.get("data", {}).get("totalElements")
has_next = data_json.get("data", {}).get("hasNext", False)
print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
if not has_next:
break
page_num += 1
return all_data
def save_json_to_pg(data: list, api_id: str) -> None:
"""写入 PostgreSQL软删历史 + 插入新数据"""
print(f"[save_to_pg] API={api_id} 写入 PG记录数={len(data)}")
sql = """
UPDATE data_api.api_data
SET is_loaded = '1'
WHERE api_id = %s;
INSERT INTO data_api.api_data
(id, api_id, data, total_num, is_loaded, status,
request_tm, execute_tm, remark)
VALUES (%s, %s, %s, %s, '0', '0',
current_timestamp(0), current_timestamp(0), '');
"""
try:
with psycopg2.connect(**PG_DSN) as conn:
with conn.cursor() as cur:
cur.execute(sql,
(api_id,
str(uuid.uuid4()),
api_id,
json.dumps(data, ensure_ascii=False),
len(data)))
conn.commit()
print(f"[save_to_pg] API={api_id} 写入完成")
except Exception as e:
raise RuntimeError(f"PG写入错误: {e}")
def get_previous_date(days: int = 0) -> str:
return (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d")
def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
# ✅ 客户拜访数据(执行)
"""
reports_data = client.fetch_all_pages(
api_name="reports",
params={}
)
save_json_to_pg(reports_data, API_ID)
# ❌ POS 数据(暂时注释)
"""
visits_data = client.fetch_all_pages(
api_name="visits",
params={} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
)
save_json_to_pg(visits_data, API_ID)
"""
# ❌ POS 数据(暂时注释)
"""
pos_data = client.fetch_all_pages(
api_name="pos_datas",
params={}
)
save_json_to_pg(pos_data, API_ID)
"""
# ❌ 客户及联系人数据(暂时注释)
"""
cust_contact_data = client.fetch_all_pages(
api_name="customer_and_contact_datas",
params={}
)
save_json_to_pg(cust_contact_data, API_ID)
"""
if __name__ == "__main__":
main()

View File

@ -0,0 +1,195 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.partner_summary_report;
insert into data_api.partner_summary_report (
accepted
, address
, advantage
, alias
, apply_admin_name
, apply_date
, apply_email
, apply_phone
, apply_situation
, benefit
, check_admin_name
, check_date
, check_status_str
, city
, complete_date
, contact_name
, contact_phone
, count
, customer_name
, dealer_name
, delay
, delay_reason
, department
, department_attr
, district
, end_date
, feature
, funding_situation
, industry
, integrated_sales
, leads_num
, list_price
, lost_reason
, lost_reason_type
, other_remark
, package_name
, package_type_name
, product_name
, product_sku_type
, product_type
, progress
, province
, region_name
, remark
, report_num
, rp_total_price
, series_name
, skus
, source
, sub_industry
, total_price
, trader_contact_name
, trader_contact_phone
, trader_name
, tsm_names_by_alias
, update_date
, way
,etl_tx_dt
)
select
case when trim(both from accepted)='' then null else accepted::text end accepted
, case when trim(both from address)='' then null else address::text end address
, case when trim(both from advantage)='' then null else advantage::text end advantage
, case when trim(both from alias)='' then null else alias::text end alias
, case when trim(both from apply_admin_name)='' then null else apply_admin_name::text end apply_admin_name
, case when trim(both from apply_date)='' then null else apply_date::text end apply_date
, case when trim(both from apply_email)='' then null else apply_email::text end apply_email
, case when trim(both from apply_phone)='' then null else apply_phone::text end apply_phone
, case when trim(both from apply_situation)='' then null else apply_situation::text end apply_situation
, case when trim(both from benefit)='' then null else benefit::text end benefit
, case when trim(both from check_admin_name)='' then null else check_admin_name::text end check_admin_name
, case when trim(both from check_date)='' then null else check_date::text end check_date
, case when trim(both from check_status_str)='' then null else check_status_str::text end check_status_str
, case when trim(both from city)='' then null else city::text end city
, case when trim(both from complete_date)='' then null else complete_date::text end complete_date
, case when trim(both from contact_name)='' then null else contact_name::text end contact_name
, case when trim(both from contact_phone)='' then null else contact_phone::text end contact_phone
, case when trim(both from count)='' then null else count::text end count
, case when trim(both from customer_name)='' then null else customer_name::text end customer_name
, case when trim(both from dealer_name)='' then null else dealer_name::text end dealer_name
, case when trim(both from delay)='' then null else delay::text end delay
, case when trim(both from delay_reason)='' then null else delay_reason::text end delay_reason
, case when trim(both from department)='' then null else department::text end department
, case when trim(both from department_attr)='' then null else department_attr::text end department_attr
, case when trim(both from district)='' then null else district::text end district
, case when trim(both from end_date)='' then null else end_date::text end end_date
, case when trim(both from feature)='' then null else feature::text end feature
, case when trim(both from funding_situation)='' then null else funding_situation::text end funding_situation
, case when trim(both from industry)='' then null else industry::text end industry
, case when trim(both from integrated_sales)='' then null else integrated_sales::text end integrated_sales
, case when trim(both from leads_num)='' then null else leads_num::text end leads_num
, case when trim(both from list_price)='' then null else list_price::text end list_price
, case when trim(both from lost_reason)='' then null else lost_reason::text end lost_reason
, case when trim(both from lost_reason_type)='' then null else lost_reason_type::text end lost_reason_type
, case when trim(both from other_remark)='' then null else other_remark::text end other_remark
, case when trim(both from package_name)='' then null else package_name::text end package_name
, case when trim(both from package_type_name)='' then null else package_type_name::text end package_type_name
, case when trim(both from product_name)='' then null else product_name::text end product_name
, case when trim(both from product_sku_type)='' then null else product_sku_type::text end product_sku_type
, case when trim(both from product_type)='' then null else product_type::text end product_type
, case when trim(both from progress)='' then null else progress::text end progress
, case when trim(both from province)='' then null else province::text end province
, case when trim(both from region_name)='' then null else region_name::text end region_name
, case when trim(both from remark)='' then null else remark::text end remark
, case when trim(both from report_num)='' then null else report_num::text end report_num
, case when trim(both from rp_total_price)='' then null else rp_total_price::text end rp_total_price
, case when trim(both from series_name)='' then null else series_name::text end series_name
, case when trim(both from skus)='' then null else skus::text end skus
, case when trim(both from source)='' then null else source::text end source
, case when trim(both from sub_industry)='' then null else sub_industry::text end sub_industry
, case when trim(both from total_price)='' then null else total_price::text end total_price
, case when trim(both from trader_contact_name)='' then null else trader_contact_name::text end trader_contact_name
, case when trim(both from trader_contact_phone)='' then null else trader_contact_phone::text end trader_contact_phone
, case when trim(both from trader_name)='' then null else trader_name::text end trader_name
, case when trim(both from tsm_names_by_alias)='' then null else tsm_names_by_alias::text end tsm_names_by_alias
, case when trim(both from update_date)='' then null else update_date::text end update_date
, case when trim(both from way)='' then null else way::text end way
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'accepted') accepted
, (json_array_elements(data::json)::json->>'address') address
, (json_array_elements(data::json)::json->>'advantage') advantage
, (json_array_elements(data::json)::json->>'alias') alias
, (json_array_elements(data::json)::json->>'applyAdminName') apply_admin_name
, (json_array_elements(data::json)::json->>'applyDate') apply_date
, (json_array_elements(data::json)::json->>'applyEmail') apply_email
, (json_array_elements(data::json)::json->>'applyPhone') apply_phone
, (json_array_elements(data::json)::json->>'applySituation') apply_situation
, (json_array_elements(data::json)::json->>'benefit') benefit
, (json_array_elements(data::json)::json->>'checkAdminName') check_admin_name
, (json_array_elements(data::json)::json->>'checkDate') check_date
, (json_array_elements(data::json)::json->>'checkStatusStr') check_status_str
, (json_array_elements(data::json)::json->>'city') city
, (json_array_elements(data::json)::json->>'completeDate') complete_date
, (json_array_elements(data::json)::json->>'contactName') contact_name
, (json_array_elements(data::json)::json->>'contactPhone') contact_phone
, (json_array_elements(data::json)::json->>'count') count
, (json_array_elements(data::json)::json->>'customerName') customer_name
, (json_array_elements(data::json)::json->>'dealerName') dealer_name
, (json_array_elements(data::json)::json->>'delay') delay
, (json_array_elements(data::json)::json->>'delayReason') delay_reason
, (json_array_elements(data::json)::json->>'department') department
, (json_array_elements(data::json)::json->>'departmentAttr') department_attr
, (json_array_elements(data::json)::json->>'district') district
, (json_array_elements(data::json)::json->>'endDate') end_date
, (json_array_elements(data::json)::json->>'feature') feature
, (json_array_elements(data::json)::json->>'fundingSituation') funding_situation
, (json_array_elements(data::json)::json->>'industry') industry
, (json_array_elements(data::json)::json->>'integratedSales') integrated_sales
, (json_array_elements(data::json)::json->>'leadsNum') leads_num
, (json_array_elements(data::json)::json->>'listPrice') list_price
, (json_array_elements(data::json)::json->>'lostReason') lost_reason
, (json_array_elements(data::json)::json->>'lostReasonType') lost_reason_type
, (json_array_elements(data::json)::json->>'otherRemark') other_remark
, (json_array_elements(data::json)::json->>'packageName') package_name
, (json_array_elements(data::json)::json->>'packageTypeName') package_type_name
, (json_array_elements(data::json)::json->>'productName') product_name
, (json_array_elements(data::json)::json->>'productSkuType') product_sku_type
, (json_array_elements(data::json)::json->>'productType') product_type
, (json_array_elements(data::json)::json->>'progress') progress
, (json_array_elements(data::json)::json->>'province') province
, (json_array_elements(data::json)::json->>'regionName') region_name
, (json_array_elements(data::json)::json->>'remark') remark
, (json_array_elements(data::json)::json->>'reportNum') report_num
, (json_array_elements(data::json)::json->>'rpTotalPrice') rp_total_price
, (json_array_elements(data::json)::json->>'seriesName') series_name
, (json_array_elements(data::json)::json->>'skus') skus
, (json_array_elements(data::json)::json->>'source') source
, (json_array_elements(data::json)::json->>'subIndustry') sub_industry
, (json_array_elements(data::json)::json->>'totalPrice') total_price
, (json_array_elements(data::json)::json->>'traderContactName') trader_contact_name
, (json_array_elements(data::json)::json->>'traderContactPhone') trader_contact_phone
, (json_array_elements(data::json)::json->>'traderName') trader_name
, (json_array_elements(data::json)::json->>'tsmNamesByAlias') tsm_names_by_alias
, (json_array_elements(data::json)::json->>'updateDate') update_date
, (json_array_elements(data::json)::json->>'way') way
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='89190c80-b241-4453-97ef-f0fbac2d' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='89190c80-b241-4453-97ef-f0fbac2d';
\q