add workflow partner1site,dev

This commit is contained in:
root 2025-09-30 16:15:42 +08:00
parent 729092f3fa
commit 40b4f81a74
5 changed files with 313 additions and 406 deletions

View File

@ -22,10 +22,10 @@ SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
# URL 用占位符(外部替换)
BASE_URLS = {
# "visits": "http://onesite.tek.cn/api/summary/pos_datas", # 客户拜访数据
# "reports": "http://onesite.tek.cn/api/summary/pos_datas", # 报备数据
"visits": "http://onesite.tek.cn/api/summary/pos_datas", # 客户拜访数据
"reports": "http://onesite.tek.cn/api/summary/pos_datas", # 报备数据
"pos_datas": "http://onesite.tek.cn/api/summary/pos_datas", # POS数据
# "customer_and_contact_datas": "http://onesite.tek.cn/api/summary/pos_datas" # 客户及联系人数据
"customer_and_contact_datas": "http://onesite.tek.cn/api/summary/pos_datas" # 客户及联系人数据
}
PG_DSN = dict(
@ -63,13 +63,13 @@ class Partner1SiteClient:
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求结束条件hasNext == False"""
if api_name not in BASE_URLS:
raise ValueError(f"未知 API 数据来源: {api_name}")
# if api_name not in BASE_URLS:
# raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = BASE_URLS[api_name]
base_url = 'http://onesite.tek.cn/api/summary/pos_datas'
all_data = []
page_num = 0
page_size = 50 # 固定每页大小
page_size = 1000 # 固定每页大小
while True:
token = self.gen_token()
@ -85,14 +85,14 @@ class Partner1SiteClient:
data_json = resp.json()
if data_json.get("code") != 100 or not data_json.get("success", False):
raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}")
raise RuntimeError(f"获取POS数据 API 错误: {data_json.get('message')}")
content = data_json.get("data", {}).get("content", [])
all_data.extend(content)
total_elements = data_json.get("data", {}).get("totalElements")
has_next = data_json.get("data", {}).get("hasNext", False)
print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
print(f"[获取POS数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
if not has_next:
break
@ -138,38 +138,12 @@ def get_previous_date(days: int = 0) -> str:
def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
# ✅ 客户拜访数据(执行)
"""
visits_data = client.fetch_all_pages(
data = client.fetch_all_pages(
api_name="visits",
params={} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
params={'startPosInsertDate':'2025-8-20',} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
)
save_json_to_pg(visits_data, API_ID)
"""
# ❌ 报备数据(暂时注释)
"""
reports_data = client.fetch_all_pages(
api_name="reports",
params={}
)
save_json_to_pg(reports_data, API_ID)
"""
save_json_to_pg(data, API_ID)
# ❌ POS 数据(暂时注释)
pos_data = client.fetch_all_pages(
api_name="pos_datas",
params={"startPosInsertDate":get_previous_date(3)}
)
save_json_to_pg(pos_data, API_ID)
# ❌ 客户及联系人数据(暂时注释)
"""
cust_contact_data = client.fetch_all_pages(
api_name="customer_and_contact_datas",
params={}
)
save_json_to_pg(cust_contact_data, API_ID)
"""
if __name__ == "__main__":

View File

@ -20,14 +20,6 @@ import psycopg2
ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc"
SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
# URL 用占位符(外部替换)
BASE_URLS = {
"visits": "http://onesite.tek.cn/api/summary/visits", # 客户拜访数据
"reports": "http://onesite.tek.cn/api/summary/visits", # 报备数据
"pos_datas": "http://onesite.tek.cn/api/summary/visits", # POS数据
"customer_and_contact_datas": "http://onesite.tek.cn/api/summary/visits" # 客户及联系人数据
}
PG_DSN = dict(
database="dataops_db",
user="dbuser_dba",
@ -63,13 +55,13 @@ class Partner1SiteClient:
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求结束条件hasNext == False"""
if api_name not in BASE_URLS:
raise ValueError(f"未知 API 数据来源: {api_name}")
# if api_name not in BASE_URLS:
# raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = BASE_URLS[api_name]
base_url = 'http://onesite.tek.cn/api/summary/visits'
all_data = []
page_num = 0
page_size = 50 # 固定每页大小
page_size = 1000 # 固定每页大小
while True:
token = self.gen_token()
@ -85,14 +77,14 @@ class Partner1SiteClient:
data_json = resp.json()
if data_json.get("code") != 100 or not data_json.get("success", False):
raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}")
raise RuntimeError(f"获取客户拜访数据 API 错误: {data_json.get('message')}")
content = data_json.get("data", {}).get("content", [])
all_data.extend(content)
total_elements = data_json.get("data", {}).get("totalElements")
has_next = data_json.get("data", {}).get("hasNext", False)
print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
print(f"[获取客户拜访数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
if not has_next:
break
@ -138,40 +130,12 @@ def get_previous_date(days: int = 0) -> str:
def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
# ✅ 客户拜访数据(执行)
visits_data = client.fetch_all_pages(
data = client.fetch_all_pages(
api_name="visits",
# params={} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
params={"startInsertDate":"2025-08-20"}
params={'startInsertDate':'2025-8-20',} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
)
save_json_to_pg(visits_data, API_ID)
save_json_to_pg(data, API_ID)
# ❌ 报备数据(暂时注释)
"""
reports_data = client.fetch_all_pages(
api_name="reports",
params={}
)
save_json_to_pg(reports_data, API_ID)
"""
# ❌ POS 数据(暂时注释)
"""
pos_data = client.fetch_all_pages(
api_name="pos_datas",
params={}
)
save_json_to_pg(pos_data, API_ID)
"""
# ❌ 客户及联系人数据(暂时注释)
"""
cust_contact_data = client.fetch_all_pages(
api_name="customer_and_contact_datas",
params={}
)
save_json_to_pg(cust_contact_data, API_ID)
"""
if __name__ == "__main__":

View File

@ -33,6 +33,7 @@ insert into data_api.partner_summary_visit (
, tsm_names_by_alias
, visit_remark
, visitor
, id
,etl_tx_dt
)
select
@ -63,6 +64,7 @@ select
, case when trim(both from tsm_names_by_alias)='' then null else tsm_names_by_alias::text end tsm_names_by_alias
, case when trim(both from visit_remark)='' then null else visit_remark::text end visit_remark
, case when trim(both from visitor)='' then null else visitor::text end visitor
, case when trim(both from id)='' then null else id::text end id
,etl_tx_dt
from (
select
@ -93,6 +95,7 @@ select
, (json_array_elements(data::json)::json->>'tsmNamesByAlias') tsm_names_by_alias
, (json_array_elements(data::json)::json->>'visitRemark') visit_remark
, (json_array_elements(data::json)::json->>'visitor') visitor
, (json_array_elements(data::json)::json->>'id') id
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='2460976d-00c1-47d9-84b2-33e66d68' and is_loaded = '0' order by request_tm desc limit 1) p )p;

View File

@ -20,14 +20,6 @@ import psycopg2
ACCESS_KEY = "75c4ab4d-6a67-4aed-8b1d-5bb64fd36afc"
SECRET_KEY = "117347a7dd066a50a4d2973c5f3d5ba9101094c5"
# URL 用占位符(外部替换)
BASE_URLS = {
# "visits": "http://onesite.tek.cn/api/summary/reports", # 客户拜访数据
"reports": "http://onesite.tek.cn/api/summary/reports" # 报备数据
# "pos_datas": "http://onesite.tek.cn/api/summary/reports", # POS数据
# "customer_and_contact_datas": "http://onesite.tek.cn/api/summary/reports" # 客户及联系人数据
}
PG_DSN = dict(
database="dataops_db",
user="dbuser_dba",
@ -63,13 +55,13 @@ class Partner1SiteClient:
def fetch_all_pages(self, api_name: str, params: Dict[str, Any]):
"""通用分页请求结束条件hasNext == False"""
if api_name not in BASE_URLS:
raise ValueError(f"未知 API 数据来源: {api_name}")
# if api_name not in BASE_URLS:
# raise ValueError(f"未知 API 数据来源: {api_name}")
base_url = BASE_URLS[api_name]
base_url = 'http://onesite.tek.cn/api/summary/reports'
all_data = []
page_num = 0
page_size = 50 # 固定每页大小
page_size = 1000 # 固定每页大小
while True:
token = self.gen_token()
@ -85,14 +77,14 @@ class Partner1SiteClient:
data_json = resp.json()
if data_json.get("code") != 100 or not data_json.get("success", False):
raise RuntimeError(f"{api_name} API 错误: {data_json.get('message')}")
raise RuntimeError(f"获取报备数据 API 错误: {data_json.get('message')}")
content = data_json.get("data", {}).get("content", [])
all_data.extend(content)
total_elements = data_json.get("data", {}).get("totalElements")
has_next = data_json.get("data", {}).get("hasNext", False)
print(f"[{api_name}] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
print(f"[获取报备数据] 页码 {page_num} -> 本页 {len(content)} 条,累计 {len(all_data)} 条 / 总数 {total_elements}")
if not has_next:
break
@ -138,40 +130,14 @@ def get_previous_date(days: int = 0) -> str:
def main():
client = Partner1SiteClient(ACCESS_KEY, SECRET_KEY)
# ✅ 报备数据(执行)
reports_data = client.fetch_all_pages(
data = client.fetch_all_pages(
api_name="reports",
params={"startInsertDate":get_previous_date(3)}
params={"startInsertDate":"2000-01-01"} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
)
save_json_to_pg(reports_data, API_ID)
# ❌ 客户拜访数据(暂时注释)
"""
visits_data = client.fetch_all_pages(
api_name="visits",
params={} # 拉全量,若有默认时间限制可改成 {"startInsertDate":"2000-01-01","endInsertDate":get_previous_date(0)}
)
save_json_to_pg(visits_data, API_ID)
"""
# ❌ POS 数据(暂时注释)
"""
pos_data = client.fetch_all_pages(
api_name="pos_datas",
params={}
)
save_json_to_pg(pos_data, API_ID)
"""
save_json_to_pg(data, API_ID)
# ❌ 客户及联系人数据(暂时注释)
"""
cust_contact_data = client.fetch_all_pages(
api_name="customer_and_contact_datas",
params={}
)
save_json_to_pg(cust_contact_data, API_ID)
"""
if __name__ == "__main__":

View File

@ -6,8 +6,7 @@
DELETE FROM data_api.partner_summary_report;
insert into data_api.partner_summary_report (
accepted
, address
address
, advantage
, alias
, apply_admin_name
@ -48,6 +47,7 @@ insert into data_api.partner_summary_report (
, product_type
, progress
, province
, record_id
, region_name
, remark
, report_num
@ -66,8 +66,7 @@ insert into data_api.partner_summary_report (
,etl_tx_dt
)
select
case when trim(both from accepted)='' then null else accepted::text end accepted
, case when trim(both from address)='' then null else address::text end address
case when trim(both from address)='' then null else address::text end address
, case when trim(both from advantage)='' then null else advantage::text end advantage
, case when trim(both from alias)='' then null else alias::text end alias
, case when trim(both from apply_admin_name)='' then null else apply_admin_name::text end apply_admin_name
@ -108,6 +107,7 @@ select
, case when trim(both from product_type)='' then null else product_type::text end product_type
, case when trim(both from progress)='' then null else progress::text end progress
, case when trim(both from province)='' then null else province::text end province
, case when trim(both from record_id)='' then null else record_id::text end record_id
, case when trim(both from region_name)='' then null else region_name::text end region_name
, case when trim(both from remark)='' then null else remark::text end remark
, case when trim(both from report_num)='' then null else report_num::text end report_num
@ -126,8 +126,7 @@ select
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'accepted') accepted
, (json_array_elements(data::json)::json->>'address') address
(json_array_elements(data::json)::json->>'address') address
, (json_array_elements(data::json)::json->>'advantage') advantage
, (json_array_elements(data::json)::json->>'alias') alias
, (json_array_elements(data::json)::json->>'applyAdminName') apply_admin_name
@ -168,6 +167,7 @@ select
, (json_array_elements(data::json)::json->>'productType') product_type
, (json_array_elements(data::json)::json->>'progress') progress
, (json_array_elements(data::json)::json->>'province') province
, (json_array_elements(data::json)::json->>'recordId') record_id
, (json_array_elements(data::json)::json->>'regionName') region_name
, (json_array_elements(data::json)::json->>'remark') remark
, (json_array_elements(data::json)::json->>'reportNum') report_num