diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/SCRM客户标签信息/Dag_Sql_ccc_scrm_cust_label_info.py b/dev/workflow/TK_Cust/smart_ccc_custom/SCRM客户标签信息/Dag_Sql_ccc_scrm_cust_label_info.py new file mode 100644 index 0000000..6ca024c --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/SCRM客户标签信息/Dag_Sql_ccc_scrm_cust_label_info.py @@ -0,0 +1,51 @@ +#!/usr/bin/python +# -*- encoding=utf-8 -*- +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.contrib.operators.ssh_operator import SSHOperator +import json + +from airflow.operators.email_operator import EmailOperator +from airflow.utils.trigger_rule import TriggerRule + + + + +sshHook = SSHHook(ssh_conn_id ='ssh_85_air') +default_args = { +'owner': 'sewp_dev', +'email': [], +'start_date': datetime(2022, 9, 12), +'depends_on_past': False, +'retries': 6, +'retry_delay': timedelta(minutes=10), +} + + +dag = DAG('Dag_Sql_ccc_scrm_cust_label_info', default_args=default_args, +schedule_interval="${cronExp}", +catchup=False, +dagrun_timeout=timedelta(minutes=160), +max_active_runs=3) + + +Sql_ccc_scrm_cust_label_info = SSHOperator( +ssh_hook=sshHook, +task_id='Sql_ccc_scrm_cust_label_info', +command='python /data/airflow/etl/PDM/{{params.my_param}}.py {{ ds_nodash }} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"sql_ccc_scrm_cust_label_info"}, +depends_on_past=False, +retries=3, +dag=dag) + +task_failed = EmailOperator ( + dag=dag, + trigger_rule=TriggerRule.ONE_FAILED, + task_id="SCRM客户标签信息_failed", + to=["${etlEmail}"], + cc=[""], + subject="SCRM客户标签信息_failed", + html_content='<h3>SCRM客户标签信息_failed" </h3>') + +Sql_ccc_scrm_cust_label_info >> task_failed \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/SCRM客户标签信息/dysql_ccc_scrm_cust_label_info.sql b/dev/workflow/TK_Cust/smart_ccc_custom/SCRM客户标签信息/dysql_ccc_scrm_cust_label_info.sql new file mode 100644 index 0000000..a98d366 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/SCRM客户标签信息/dysql_ccc_scrm_cust_label_info.sql @@ -0,0 +1,43 @@ +/********************************************************************************************/ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +drop table if exists ccc_scrm_cust_label_info_t1; +create temporary table ccc_scrm_cust_label_info_t1 +as +select p1.cust_id +,p1.cust_external_id +,p2.leads_id scrm_leads_id +,p1.cust_label_id +,p1.cust_label_name +,p3.scrm_label_id scrm_label_id +,'0' as sync_ind +,0 Etl_Batch_No +,current_date Etl_First_Dt +,'ccc_scrm_cust_label_info' Etl_Job +,current_timestamp(0) Etl_Proc_Dt +,current_date Etl_Tx_Dt +,Substr('com',1,3) Src_Sysname +,'ccc_scrm_cust_label_info' Src_Table +from p30_common.cust_label_rela p1 +left join p61_output.scrm_leads_external_id_mapping p2 +on p1.cust_external_id =p2.external_id +left join p20_pdm.t01_scrm_label p3 +on p1.cust_label_name =p3.scrm_label_name ; + +insert into p61_output.ccc_scrm_cust_label_info +(cust_id, cust_external_id, scrm_leads_id, cust_label_id, cust_label_name, scrm_label_id, sync_ind, etl_batch_no, etl_first_dt, etl_job, etl_proc_dt, etl_tx_dt, src_sysname, src_table) +select p1.cust_id, p1.cust_external_id, p1.scrm_leads_id +, p1.cust_label_id, p1.cust_label_name, p1.scrm_label_id +, p1.sync_ind, p1.etl_batch_no, p1.etl_first_dt, p1.etl_job +, p1.etl_proc_dt, p1.etl_tx_dt, p1.src_sysname, p1.src_table +from ccc_scrm_cust_label_info_t1 p1 +left join p61_output.ccc_scrm_cust_label_info p2 +on p1.cust_id = p2.cust_id +and p1.cust_label_id=p2.cust_label_id +where p2.cust_id is null +; + +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py b/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py index 3383ce2..4e01f74 100644 --- a/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py +++ b/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py @@ -110,8 +110,22 @@ params={'my_param':"t01_ccc_cust_info_agi"}, depends_on_past=False, retries=3, dag=dag) + +dysql_ccc_scrm_cust_label_info = SSHOperator( +ssh_hook=sshHook, +task_id='dysql_ccc_scrm_cust_label_info', +command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"dysql_ccc_scrm_cust_label_info"}, +depends_on_past=False, +retries=3, +dag=dag) + + + customer_labels_load >> tr_custom_labels_8280 customer_list_load >> tr_custom_details_5516 tr_custom_labels_8280 >> t01_ccc_cust_label tr_custom_details_5516 >> t01_ccc_cust_info -t01_ccc_cust_info >> task_failed +t01_ccc_cust_label >> dysql_ccc_scrm_cust_label_info +t01_ccc_cust_info >> dysql_ccc_scrm_cust_label_info +dysql_ccc_scrm_cust_label_info >> task_failed