add workflow partner1site,dev

This commit is contained in:
root 2025-12-02 16:47:35 +08:00
parent 34f4c09c23
commit b81a551c84
7 changed files with 74 additions and 17 deletions

View File

@ -37,6 +37,9 @@ CREATE TABLE IF NOT EXISTS p20_pdm.t01_partner_pos (
, transfer varchar(100)
, tsm_names_by_alias varchar(100)
, zip varchar(100)
, net_usd text
, product_family_code text
, product_family_name text
, Etl_Batch_No varchar(50)
, Etl_First_Dt timestamp(0)
, Etl_Job varchar(200)
@ -83,6 +86,9 @@ CREATE TABLE IF NOT EXISTS p20_pdm.t01_partner_pos (
COMMENT ON COLUMN p20_pdm.t01_partner_pos.transfer IS '';
COMMENT ON COLUMN p20_pdm.t01_partner_pos.tsm_names_by_alias IS '';
COMMENT ON COLUMN p20_pdm.t01_partner_pos.zip IS '';
COMMENT ON COLUMN p20_pdm.t01_partner_pos.net_usd IS 'oac 系统原始产品价格字段';
COMMENT ON COLUMN p20_pdm.t01_partner_pos.product_family_code IS '';
COMMENT ON COLUMN p20_pdm.t01_partner_pos.product_family_name IS '';
COMMENT ON COLUMN p20_pdm.t01_partner_pos.Etl_Batch_No IS '作业批次号';
COMMENT ON COLUMN p20_pdm.t01_partner_pos.Etl_First_Dt IS '最初入库时间';
COMMENT ON COLUMN p20_pdm.t01_partner_pos.Etl_Job IS '作业名称';

View File

@ -4,11 +4,11 @@
/*Brilliance stems from wisdoms. */
/*************Head Section**************************************************************************/
/*Script Use: Periodically load data to :t01_partner_pos(Partner POS数据) */
/*Create Date:2025-09-30 19:12:24 */
/*Create Date:2025-12-02 16:43:54 */
/*SDM Developed By: dev */
/*SDM Developed Date: 2025-09-29 */
/*SDM Checked By: dev */
/*SDM Checked Date: 2025-09-30 */
/*SDM Checked Date: 2025-12-02 */
/*Script Developed By: dev */
/*Script Checked By: dev */
/*Source table 1: p10_sa.s98_s_partner_summary_pos */
@ -78,6 +78,9 @@ INSERT INTO t01_partner_pos_agi_CUR_I (
,transfer /**/
,tsm_names_by_alias /**/
,zip /**/
,net_usd /*oac 系统原始产品价格字段*/
,product_family_code /**/
,product_family_name /**/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
@ -123,6 +126,9 @@ SELECT
,COALESCE(TRIM(CAST(p0.transfer AS varchar(100))),'') /*transfer*/
,COALESCE(TRIM(CAST(p0.tsm_names_by_alias AS varchar(100))),'') /*tsm_names_by_alias*/
,COALESCE(TRIM(CAST(p0.zip AS varchar(100))),'') /*zip*/
,COALESCE(TRIM(p0.net_usd),'') /*net_usd*/
,COALESCE(TRIM(p0.product_family_code),'') /*product_family_code*/
,COALESCE(TRIM(p0.product_family_name),'') /*product_family_name*/
,0 /*Etl_Batch_No*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,:ETLJOB /*Etl_Job*/
@ -173,6 +179,9 @@ FROM p10_sa.s98_s_partner_summary_pos p0
,transfer /**/
,tsm_names_by_alias /**/
,zip /**/
,net_usd /*oac 系统原始产品价格字段*/
,product_family_code /**/
,product_family_name /**/
,id /**/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
@ -219,6 +228,9 @@ FROM p10_sa.s98_s_partner_summary_pos p0
,P1.transfer /**/
,P1.tsm_names_by_alias /**/
,P1.zip /**/
,P1.net_usd /*oac 系统原始产品价格字段*/
,P1.product_family_code /**/
,P1.product_family_name /**/
,P1.id /**/
,P1.Etl_Batch_No /*作业批次号*/
,P1.Etl_First_Dt /*最初入库时间*/
@ -265,6 +277,9 @@ ON P1.address = P2.address
AND P1.transfer = P2.transfer
AND P1.tsm_names_by_alias = P2.tsm_names_by_alias
AND P1.zip = P2.zip
AND P1.net_usd = P2.net_usd
AND P1.product_family_code = P2.product_family_code
AND P1.product_family_name = P2.product_family_name
AND P1.id = P2.id
WHERE P2.address IS NULL
@ -302,6 +317,9 @@ WHERE P2.address IS NULL
OR P2.transfer IS NULL
OR P2.tsm_names_by_alias IS NULL
OR P2.zip IS NULL
OR P2.net_usd IS NULL
OR P2.product_family_code IS NULL
OR P2.product_family_name IS NULL
OR P2.id IS NULL
;
@ -342,6 +360,9 @@ WHERE P2.address IS NULL
,transfer /**/
,tsm_names_by_alias /**/
,zip /**/
,net_usd /*oac 系统原始产品价格字段*/
,product_family_code /**/
,product_family_name /**/
,id /**/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
@ -388,6 +409,9 @@ SELECT
,P1.transfer /**/
,P1.tsm_names_by_alias /**/
,P1.zip /**/
,P1.net_usd /*oac 系统原始产品价格字段*/
,P1.product_family_code /**/
,P1.product_family_name /**/
,P1.id /**/
,P1.Etl_Batch_No /*作业批次号*/
,P1.Etl_First_Dt /*最初入库时间*/
@ -436,6 +460,9 @@ DO UPDATE SET
,transfer=excluded.transfer
,tsm_names_by_alias=excluded.tsm_names_by_alias
,zip=excluded.zip
,net_usd=excluded.net_usd
,product_family_code=excluded.product_family_code
,product_family_name=excluded.product_family_name
,Etl_Batch_No=excluded.Etl_Batch_No
,Etl_First_Dt=excluded.Etl_First_Dt
,Etl_Job=excluded.Etl_Job

View File

@ -31,6 +31,7 @@ CREATE TABLE IF NOT EXISTS p20_pdm.t01_partner_visit (
, sub_industry varchar(100)
, tsm_names_by_alias varchar(100)
, customer_category varchar(100)
, update_date text
, Etl_Batch_No varchar(50)
, Etl_First_Dt timestamp(0)
, Etl_Job varchar(200)
@ -71,6 +72,7 @@ CREATE TABLE IF NOT EXISTS p20_pdm.t01_partner_visit (
COMMENT ON COLUMN p20_pdm.t01_partner_visit.sub_industry IS '子行业';
COMMENT ON COLUMN p20_pdm.t01_partner_visit.tsm_names_by_alias IS 'tsm名';
COMMENT ON COLUMN p20_pdm.t01_partner_visit.customer_category IS '客户类别';
COMMENT ON COLUMN p20_pdm.t01_partner_visit.update_date IS '';
COMMENT ON COLUMN p20_pdm.t01_partner_visit.Etl_Batch_No IS '作业批次号';
COMMENT ON COLUMN p20_pdm.t01_partner_visit.Etl_First_Dt IS '最初入库时间';
COMMENT ON COLUMN p20_pdm.t01_partner_visit.Etl_Job IS '作业名称';

View File

@ -4,11 +4,11 @@
/*Brilliance stems from wisdoms. */
/*************Head Section**************************************************************************/
/*Script Use: Periodically load data to :t01_partner_visit(Partner客户拜访记录) */
/*Create Date:2025-10-16 17:59:47 */
/*Create Date:2025-12-02 16:46:53 */
/*SDM Developed By: dev */
/*SDM Developed Date: 2025-09-29 */
/*SDM Checked By: dev */
/*SDM Checked Date: 2025-10-16 */
/*SDM Checked Date: 2025-12-02 */
/*Script Developed By: dev */
/*Script Checked By: dev */
/*Source table 1: p10_sa.s98_s_partner_summary_visit */
@ -72,6 +72,7 @@ INSERT INTO t01_partner_visit_agi_CUR_I (
,sub_industry /*子行业*/
,tsm_names_by_alias /*tsm名*/
,customer_category /*客户类别*/
,update_date /**/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
@ -111,6 +112,7 @@ SELECT
,COALESCE(TRIM(CAST(p0.sub_industry AS varchar(100))),'') /*sub_industry*/
,COALESCE(TRIM(CAST(p0.tsm_names_by_alias AS varchar(100))),'') /*tsm_names_by_alias*/
,COALESCE(TRIM(CAST(p0.customer_category AS varchar(100))),'') /*customer_category*/
,p0.update_date::date /*update_date*/
,0 /*Etl_Batch_No*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,:ETLJOB /*Etl_Job*/
@ -155,6 +157,7 @@ FROM p10_sa.s98_s_partner_summary_visit p0
,sub_industry /*子行业*/
,tsm_names_by_alias /*tsm名*/
,customer_category /*客户类别*/
,update_date /**/
,id /*主键ID*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
@ -195,6 +198,7 @@ FROM p10_sa.s98_s_partner_summary_visit p0
,P1.sub_industry /*子行业*/
,P1.tsm_names_by_alias /*tsm名*/
,P1.customer_category /*客户类别*/
,P1.update_date /**/
,P1.id /*主键ID*/
,P1.Etl_Batch_No /*作业批次号*/
,P1.Etl_First_Dt /*最初入库时间*/
@ -235,6 +239,7 @@ ON P1.insert_date = P2.insert_date
AND P1.sub_industry = P2.sub_industry
AND P1.tsm_names_by_alias = P2.tsm_names_by_alias
AND P1.customer_category = P2.customer_category
AND P1.update_date = P2.update_date
AND P1.id = P2.id
WHERE P2.insert_date IS NULL
@ -266,6 +271,7 @@ WHERE P2.insert_date IS NULL
OR P2.sub_industry IS NULL
OR P2.tsm_names_by_alias IS NULL
OR P2.customer_category IS NULL
OR P2.update_date IS NULL
OR P2.id IS NULL
;
@ -300,6 +306,7 @@ WHERE P2.insert_date IS NULL
,sub_industry /*子行业*/
,tsm_names_by_alias /*tsm名*/
,customer_category /*客户类别*/
,update_date /**/
,id /*主键ID*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
@ -340,6 +347,7 @@ SELECT
,P1.sub_industry /*子行业*/
,P1.tsm_names_by_alias /*tsm名*/
,P1.customer_category /*客户类别*/
,P1.update_date /**/
,P1.id /*主键ID*/
,P1.Etl_Batch_No /*作业批次号*/
,P1.Etl_First_Dt /*最初入库时间*/
@ -382,6 +390,7 @@ DO UPDATE SET
,sub_industry=excluded.sub_industry
,tsm_names_by_alias=excluded.tsm_names_by_alias
,customer_category=excluded.customer_category
,update_date=excluded.update_date
,Etl_Batch_No=excluded.Etl_Batch_No
,Etl_First_Dt=excluded.Etl_First_Dt
,Etl_Job=excluded.Etl_Job

View File

@ -3,6 +3,7 @@ DROP TABLE IF EXISTS p30_common.d_partner_contact;
CREATE TABLE IF NOT EXISTS p30_common.d_partner_contact (
contact_id bigint
, contact_name VARCHAR(200)
, company VARCHAR(200)
, province VARCHAR(50)
, city VARCHAR(50)
, address text
@ -21,6 +22,7 @@ CREATE TABLE IF NOT EXISTS p30_common.d_partner_contact (
COMMENT ON COLUMN p30_common.d_partner_contact.contact_id IS '联系人ID';
COMMENT ON COLUMN p30_common.d_partner_contact.contact_name IS '联系人名称';
COMMENT ON COLUMN p30_common.d_partner_contact.company IS '公司';
COMMENT ON COLUMN p30_common.d_partner_contact.province IS '省份';
COMMENT ON COLUMN p30_common.d_partner_contact.city IS '城市';
COMMENT ON COLUMN p30_common.d_partner_contact.address IS '地址';

View File

@ -4,11 +4,11 @@
/*Brilliance stems from wisdoms. */
/*************Head Section**************************************************************************/
/*Script Use: Periodically load data to :d_partner_contact(partner one site联系人信息) */
/*Create Date:2025-10-14 18:24:13 */
/*Create Date:2025-10-17 17:24:58 */
/*SDM Developed By: dev */
/*SDM Developed Date: 2025-10-14 */
/*SDM Checked By: dev */
/*SDM Checked Date: 2025-10-14 */
/*SDM Checked Date: 2025-10-17 */
/*Script Developed By: dev */
/*Script Checked By: dev */
/*Source table 1: :PDMDB.t01_partner_customer_and_contact */
@ -44,6 +44,7 @@ ON COMMIT PRESERVE ROWS;
INSERT INTO d_partner_contact_agi_CUR_I (
contact_id /*联系人ID*/
,contact_name /*联系人名称*/
,company /*公司*/
,province /*省份*/
,city /*城市*/
,address /*地址*/
@ -61,6 +62,7 @@ INSERT INTO d_partner_contact_agi_CUR_I (
SELECT
COALESCE(p0.customer_contact_id,0) /*contact_id*/
,COALESCE(TRIM(p0.contact_name),'') /*contact_name*/
,COALESCE(TRIM(p0.customer_name),'') /*company*/
,COALESCE(TRIM(p0.province),'') /*province*/
,COALESCE(TRIM(p0.city),'') /*city*/
,COALESCE(TRIM(p0.address),'') /*address*/
@ -83,6 +85,7 @@ FROM :PDMDB.t01_partner_customer_and_contact p0
/*将不同数据插入到临时表 */
;INSERT INTO d_partner_contact_agi_INS (
contact_name /*联系人名称*/
,company /*公司*/
,province /*省份*/
,city /*城市*/
,address /*地址*/
@ -101,6 +104,7 @@ FROM :PDMDB.t01_partner_customer_and_contact p0
)
SELECT
P1.contact_name /*联系人名称*/
,P1.company /*公司*/
,P1.province /*省份*/
,P1.city /*城市*/
,P1.address /*地址*/
@ -119,6 +123,7 @@ FROM :PDMDB.t01_partner_customer_and_contact p0
FROM d_partner_contact_agi_CUR_I P1
LEFT JOIN :COMMDB.d_partner_contact P2
ON P1.contact_name = P2.contact_name
AND P1.company = P2.company
AND P1.province = P2.province
AND P1.city = P2.city
AND P1.address = P2.address
@ -128,6 +133,7 @@ ON P1.contact_name = P2.contact_name
AND P1.contact_id = P2.contact_id
WHERE P2.contact_name IS NULL
OR P2.company IS NULL
OR P2.province IS NULL
OR P2.city IS NULL
OR P2.address IS NULL
@ -140,6 +146,7 @@ WHERE P2.contact_name IS NULL
/*将新增数据插入到目标表 */
;INSERT INTO :COMMDB.d_partner_contact (
contact_name /*联系人名称*/
,company /*公司*/
,province /*省份*/
,city /*城市*/
,address /*地址*/
@ -158,6 +165,7 @@ WHERE P2.contact_name IS NULL
)
SELECT
P1.contact_name /*联系人名称*/
,P1.company /*公司*/
,P1.province /*省份*/
,P1.city /*城市*/
,P1.address /*地址*/
@ -178,6 +186,7 @@ ON CONFLICT ( contact_id)
DO UPDATE SET
contact_id=excluded.contact_id
,contact_name=excluded.contact_name
,company=excluded.company
,province=excluded.province
,city=excluded.city
,address=excluded.address

View File

@ -26,8 +26,10 @@ default_args = {
dag = DAG('wf_dag_partner1site', default_args=default_args,
schedule_interval="0 1 * * *",
catchup=False,
dagrun_timeout=timedelta(minutes=160),
max_active_runs=3)
dagrun_timeout=timedelta(minutes=600),
max_active_runs=3,
tags=['64cbd1bbced14209b5e3a879f89e8ab1','TK_Cust','partner1site']
)
task_failed = EmailOperator (
dag=dag,
@ -60,7 +62,7 @@ part_summary_visit_feign >> part_summary_visit_load
partner_summary_visit_9060 = SSHOperator(
ssh_hook=sshHook,
task_id='partner_summary_visit_9060',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ',
params={'my_param':"S98_S_partner_summary_visit"},
depends_on_past=False,
retries=3,
@ -88,7 +90,7 @@ part_summary_report_feign >> part_summary_report_load
partner_summary_report_6257 = SSHOperator(
ssh_hook=sshHook,
task_id='partner_summary_report_6257',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ',
params={'my_param':"S98_S_partner_summary_report"},
depends_on_past=False,
retries=3,
@ -116,7 +118,7 @@ part_summary_pos_feign >> part_summary_pos_load
partner_summary_pos_4937 = SSHOperator(
ssh_hook=sshHook,
task_id='partner_summary_pos_4937',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ',
params={'my_param':"S98_S_partner_summary_pos"},
depends_on_past=False,
retries=3,
@ -144,7 +146,7 @@ part_summary_custome_feign >> part_summary_custome_load
partner_summary_customer_5702 = SSHOperator(
ssh_hook=sshHook,
task_id='partner_summary_customer_5702',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ',
params={'my_param':"S98_S_partner_summary_customer"},
depends_on_past=False,
retries=3,
@ -153,7 +155,7 @@ dag=dag)
t01_partner_pos = SSHOperator(
ssh_hook=sshHook,
task_id='t01_partner_pos',
command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ',
params={'my_param':"t01_partner_pos_agi"},
depends_on_past=False,
retries=3,
@ -161,7 +163,7 @@ dag=dag)
t01_partner_customer_and_contact = SSHOperator(
ssh_hook=sshHook,
task_id='t01_partner_customer_and_contact',
command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ',
params={'my_param':"t01_partner_customer_and_contact_agi"},
depends_on_past=False,
retries=3,
@ -169,7 +171,7 @@ dag=dag)
t01_partner_report = SSHOperator(
ssh_hook=sshHook,
task_id='t01_partner_report',
command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ',
params={'my_param':"t01_partner_report_agi"},
depends_on_past=False,
retries=3,
@ -177,7 +179,7 @@ dag=dag)
t01_partner_visit = SSHOperator(
ssh_hook=sshHook,
task_id='t01_partner_visit',
command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ',
params={'my_param':"t01_partner_visit_agi"},
depends_on_past=False,
retries=3,
@ -185,7 +187,7 @@ dag=dag)
d_partner_contact = SSHOperator(
ssh_hook=sshHook,
task_id='d_partner_contact',
command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ',
params={'my_param':"d_partner_contact_agi"},
depends_on_past=False,
retries=3,