From 13c9fb9207d7b5addebf81394c55500a63243121 Mon Sep 17 00:00:00 2001 From: root <root@94a9702fdab3> Date: Mon, 8 Jul 2024 19:53:59 +0800 Subject: [PATCH] =?UTF-8?q?add=20workflow=20=E5=A4=A9=E6=B6=A6Smart-ccc?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E6=95=B0=E6=8D=AE,dev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../S98_S_tr_custom_details.sql | 4 +- .../wf_dag_smart_ccc_custom.py | 43 ++++++++++++++++--- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/S98_S_tr_custom_details.sql b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/S98_S_tr_custom_details.sql index 77efd5b..e4f38d0 100644 --- a/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/S98_S_tr_custom_details.sql +++ b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/S98_S_tr_custom_details.sql @@ -85,9 +85,9 @@ insert into p10_sa.S98_S_tr_custom_details , label_ids , etl_tx_dt from p00_tal.S98_S_tr_custom_details - ; + ; delete from p12_sfull.S98_S_tr_custom_details -; + where (id) in (select id from p10_sa.S98_S_tr_custom_details) ; ; insert into p12_sfull.S98_S_tr_custom_details ( id diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py b/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py index 51a4aa6..c876008 100644 --- a/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py +++ b/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py @@ -37,7 +37,7 @@ task_failed = EmailOperator ( cc=[""], subject="smart_ccc_custom_failed", html_content='<h3>您好,smart_ccc_custom作业失败,请及时处理" </h3>') - + customer_list_feign = SSHOperator( ssh_hook=sshHook, task_id='customer_list_feign', @@ -46,7 +46,17 @@ depends_on_past=False, retries=3, dag=dag) +customer_list_load = SSHOperator( +ssh_hook=sshHook, +task_id='customer_list_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"customer_list_load"}, +depends_on_past=False, +retries=3, +dag=dag) +customer_list_feign >> customer_list_load + customer_labels_feign = SSHOperator( ssh_hook=sshHook, task_id='customer_labels_feign', @@ -65,7 +75,7 @@ retries=3, dag=dag) customer_labels_feign >> customer_labels_load - + tr_custom_details_5516 = SSHOperator( ssh_hook=sshHook, task_id='tr_custom_details_5516', @@ -74,7 +84,7 @@ params={'my_param':"S98_S_tr_custom_details"}, depends_on_past=False, retries=3, dag=dag) - + tr_custom_labels_8280 = SSHOperator( ssh_hook=sshHook, task_id='tr_custom_labels_8280', @@ -83,8 +93,27 @@ params={'my_param':"S98_S_tr_custom_labels"}, depends_on_past=False, retries=3, dag=dag) + +customer_detail_feign = SSHOperator( +ssh_hook=sshHook, +task_id='customer_detail_feign', +command='python3 /data/airflow/etl/API/customer_detail_feign.py', +depends_on_past=False, +retries=3, +dag=dag) -customer_labels_load >> tr_custom_labels_8280 -customer_list_feign >> tr_custom_details_5516 -tr_custom_details_5516 >> task_failed -tr_custom_labels_8280 >> task_failed +customer_detail_load = SSHOperator( +ssh_hook=sshHook, +task_id='customer_detail_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"customer_detail_load"}, +depends_on_past=False, +retries=3, +dag=dag) + +customer_detail_feign >> customer_detail_load + +customer_labels_load >> tr_custom_labels_8280 +customer_list_load >> tr_custom_details_5516 +tr_custom_details_5516 >> task_failed +customer_detail_load >> task_failed