diff --git a/dev/workflow/TK_Cust/tk_crm/泰克CRM/wf_dag_tk_crm.py b/dev/workflow/TK_Cust/tk_crm/泰克CRM/wf_dag_tk_crm.py index a662344..0e80f06 100644 --- a/dev/workflow/TK_Cust/tk_crm/泰克CRM/wf_dag_tk_crm.py +++ b/dev/workflow/TK_Cust/tk_crm/泰克CRM/wf_dag_tk_crm.py @@ -14,7 +14,7 @@ from airflow.utils.trigger_rule import TriggerRule sshHook = SSHHook(ssh_conn_id ='ssh_air') default_args = { -'owner': 'info@idgvalue.com', +'owner': 'tek_newsletter@163.com', 'email_on_failure': True, 'email_on_retry':True, 'start_date': datetime(2024, 1, 1), @@ -24,16 +24,18 @@ default_args = { } dag = DAG('wf_dag_tk_crm', default_args=default_args, -schedule_interval="0 18 * * 5", +schedule_interval="0 18 * * 3,5", catchup=False, -dagrun_timeout=timedelta(minutes=160), -max_active_runs=3) +dagrun_timeout=timedelta(minutes=600), +max_active_runs=3, +tags=['64cbd1bbced14209b5e3a879f89e8ab1','TK_Cust','泰克CRM'] +) task_failed = EmailOperator ( dag=dag, trigger_rule=TriggerRule.ONE_FAILED, task_id="task_failed", - to=["info@idgvalue.com"], + to=["tek_newsletter@163.com"], cc=[""], subject="tk_crm_failed", html_content='

您好,tk_crm作业失败,请及时处理"

') @@ -41,7 +43,7 @@ task_failed = EmailOperator ( file_CRM_account = SSHOperator( ssh_hook=sshHook, task_id='file_CRM_account', -command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} ', params={'my_param':"CRM_account"}, depends_on_past=False, retries=3, @@ -51,7 +53,7 @@ dag=dag) file_CRM_contact_ccp = SSHOperator( ssh_hook=sshHook, task_id='file_CRM_contact_ccp', -command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} ', params={'my_param':"CRM_contact_ccp"}, depends_on_past=False, retries=3, @@ -61,7 +63,7 @@ dag=dag) file_CRM_contact_part1 = SSHOperator( ssh_hook=sshHook, task_id='file_CRM_contact_part1', -command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} ', params={'my_param':"CRM_contact_part1"}, depends_on_past=False, retries=3, @@ -71,7 +73,7 @@ dag=dag) file_CCP_mapping_table = SSHOperator( ssh_hook=sshHook, task_id='file_CCP_mapping_table', -command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} ', params={'my_param':"CCP_mapping_table"}, depends_on_past=False, retries=3, @@ -81,7 +83,7 @@ dag=dag) crm_account_4545 = SSHOperator( ssh_hook=sshHook, task_id='crm_account_4545', -command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ', params={'my_param':"S98_S_crm_account"}, depends_on_past=False, retries=3, @@ -90,7 +92,7 @@ dag=dag) crm_contact_ccp_5681 = SSHOperator( ssh_hook=sshHook, task_id='crm_contact_ccp_5681', -command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ', params={'my_param':"S98_S_crm_contact_ccp"}, depends_on_past=False, retries=3, @@ -99,7 +101,7 @@ dag=dag) ccp_mapping_table_8972 = SSHOperator( ssh_hook=sshHook, task_id='ccp_mapping_table_8972', -command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ', params={'my_param':"S98_S_ccp_mapping_table"}, depends_on_past=False, retries=3, @@ -108,7 +110,7 @@ dag=dag) crm_contact_part_605 = SSHOperator( ssh_hook=sshHook, task_id='crm_contact_part_605', -command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ', params={'my_param':"S98_S_crm_contact_part"}, depends_on_past=False, retries=3, @@ -117,7 +119,7 @@ dag=dag) file_country_cde = SSHOperator( ssh_hook=sshHook, task_id='file_country_cde', -command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} ', params={'my_param':"country_cde"}, depends_on_past=False, retries=3, @@ -127,7 +129,7 @@ dag=dag) country_cde_3310 = SSHOperator( ssh_hook=sshHook, task_id='country_cde_3310', -command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ', params={'my_param':"S98_S_country_cde"}, depends_on_past=False, retries=3, @@ -136,7 +138,7 @@ dag=dag) file_china_city = SSHOperator( ssh_hook=sshHook, task_id='file_china_city', -command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} ', params={'my_param':"china_city"}, depends_on_past=False, retries=3, @@ -146,7 +148,7 @@ dag=dag) t01_crm_contact = SSHOperator( ssh_hook=sshHook, task_id='t01_crm_contact', -command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"t01_crm_contact_agi"}, depends_on_past=False, retries=3, @@ -154,7 +156,7 @@ dag=dag) T01_CRM_CCP_TYPE = SSHOperator( ssh_hook=sshHook, task_id='T01_CRM_CCP_TYPE', -command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"T01_CRM_CCP_TYPE_agi"}, depends_on_past=False, retries=3, @@ -162,7 +164,7 @@ dag=dag) t01_crm_cust_ccp = SSHOperator( ssh_hook=sshHook, task_id='t01_crm_cust_ccp', -command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"t01_crm_cust_ccp_agi"}, depends_on_past=False, retries=3, @@ -170,7 +172,7 @@ dag=dag) t01_crm_account = SSHOperator( ssh_hook=sshHook, task_id='t01_crm_account', -command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"t01_crm_account_agi"}, depends_on_past=False, retries=3, @@ -178,7 +180,7 @@ dag=dag) file_crm_opp = SSHOperator( ssh_hook=sshHook, task_id='file_crm_opp', -command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} ', params={'my_param':"crm_opp"}, depends_on_past=False, retries=3, @@ -188,7 +190,7 @@ dag=dag) crm_opp = SSHOperator( ssh_hook=sshHook, task_id='crm_opp', -command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ', params={'my_param':"S98_S_crm_opp"}, depends_on_past=False, retries=3, @@ -197,7 +199,7 @@ dag=dag) t01_crm_opportunity = SSHOperator( ssh_hook=sshHook, task_id='t01_crm_opportunity', -command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"t01_crm_opportunity_agi"}, depends_on_past=False, retries=3, @@ -205,7 +207,7 @@ dag=dag) d_crm_contact = SSHOperator( ssh_hook=sshHook, task_id='d_crm_contact', -command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"d_crm_contact_agi"}, depends_on_past=False, retries=3, @@ -213,7 +215,7 @@ dag=dag) cust_contact_mapping = SSHOperator( ssh_hook=sshHook, task_id='cust_contact_mapping', -command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"cust_contact_mapping_agi"}, depends_on_past=False, retries=3, @@ -221,7 +223,7 @@ dag=dag) cust_contact_info = SSHOperator( ssh_hook=sshHook, task_id='cust_contact_info', -command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"cust_contact_info_agi"}, depends_on_past=False, retries=3, @@ -233,7 +235,7 @@ dag=dag) file_CRM_Raw_Leads = SSHOperator( ssh_hook=sshHook, task_id='file_CRM_Raw_Leads', -command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} ', params={'my_param':"CRM_Raw_Leads"}, depends_on_past=False, retries=3, @@ -243,7 +245,7 @@ dag=dag) crm_raw_leads_6024 = SSHOperator( ssh_hook=sshHook, task_id='crm_raw_leads_6024', -command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ', params={'my_param':"S98_S_crm_raw_leads"}, depends_on_past=False, retries=3, @@ -252,7 +254,7 @@ dag=dag) t01_crm_raw_leads = SSHOperator( ssh_hook=sshHook, task_id='t01_crm_raw_leads', -command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"t01_crm_raw_leads_agi"}, depends_on_past=False, retries=3, @@ -260,7 +262,7 @@ dag=dag) cust_leads = SSHOperator( ssh_hook=sshHook, task_id='cust_leads', -command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"cust_leads_agi"}, depends_on_past=False, retries=3, @@ -268,7 +270,7 @@ dag=dag) cust_all_info = SSHOperator( ssh_hook=sshHook, task_id='cust_all_info', -command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"cust_all_info_agi"}, depends_on_past=False, retries=3, @@ -276,7 +278,7 @@ dag=dag) cust_leads_detail = SSHOperator( ssh_hook=sshHook, task_id='cust_leads_detail', -command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"cust_leads_detail_agi"}, depends_on_past=False, retries=3, @@ -284,7 +286,7 @@ dag=dag) cust_enagement_records = SSHOperator( ssh_hook=sshHook, task_id='cust_enagement_records', -command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"cust_enagement_records_agi"}, depends_on_past=False, retries=3, @@ -292,11 +294,21 @@ dag=dag) data_source_update = SSHOperator( ssh_hook=sshHook, task_id='data_source_update', -command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', params={'my_param':"data_source_update_agi"}, depends_on_past=False, retries=3, dag=dag) +file_crm_order = SSHOperator( +ssh_hook=sshHook, +task_id='file_crm_order', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} ', +params={'my_param':"crm_order"}, +depends_on_past=False, +retries=3, +dag=dag) + + file_CRM_account >> crm_account_4545 file_CRM_contact_ccp >> crm_contact_ccp_5681 file_CCP_mapping_table >> ccp_mapping_table_8972 @@ -323,3 +335,4 @@ t01_crm_raw_leads >> cust_all_info cust_all_info >> cust_enagement_records cust_enagement_records >> task_failed file_china_city >> task_failed +file_crm_order >> task_failed