From 28d7ed5d1f5bbeba5fc0a8bc8d6b15c7653b392a Mon Sep 17 00:00:00 2001 From: root Date: Mon, 15 Jan 2024 14:23:30 +0800 Subject: [PATCH] =?UTF-8?q?add=20workflow=20=E6=B3=B0=E5=85=8BCRM,dev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TK_Cust/dev/tk_crm/泰克CRM/wf_dag_tk_crm.py | 160 ++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 TK_Cust/dev/tk_crm/泰克CRM/wf_dag_tk_crm.py diff --git a/TK_Cust/dev/tk_crm/泰克CRM/wf_dag_tk_crm.py b/TK_Cust/dev/tk_crm/泰克CRM/wf_dag_tk_crm.py new file mode 100644 index 0000000..cf304a9 --- /dev/null +++ b/TK_Cust/dev/tk_crm/泰克CRM/wf_dag_tk_crm.py @@ -0,0 +1,160 @@ +#!/usr/bin/python +# -*- encoding=utf-8 -*- +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.contrib.operators.ssh_operator import SSHOperator +from airflow.sensors.external_task_sensor import ExternalTaskSensor +import json + +from airflow.operators.email_operator import EmailOperator +from airflow.utils.trigger_rule import TriggerRule + + +sshHook = SSHHook(ssh_conn_id ='ssh_air') +default_args = { +'owner': 'info@idgvalue.com', +'email': [''], +'email_on_failure': True, +'email_on_retry':True, +'start_date': datetime(2022, 9, 12), +'depends_on_past': False, +'retries': 6, +'retry_delay': timedelta(minutes=10), +} + +dag = DAG('wf_dag_tk_crm', default_args=default_args, +schedule_interval="0 0 * * *", +catchup=False, +dagrun_timeout=timedelta(minutes=160), +max_active_runs=3) + +task_failed = EmailOperator ( + dag=dag, + trigger_rule=TriggerRule.ONE_FAILED, + task_id="task_failed", + to=["info@idgvalue.com"], + cc=[""], + subject="tk_crm_failed", + html_content='

您好,tk_crm作业失败,请及时处理"

') + +file_CRM_account = SSHOperator( +ssh_hook=sshHook, +task_id='file_CRM_account', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"CRM_account"}, +depends_on_past=False, +retries=3, +dag=dag) + + +file_CRM_contact_ccp = SSHOperator( +ssh_hook=sshHook, +task_id='file_CRM_contact_ccp', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"CRM_contact_ccp"}, +depends_on_past=False, +retries=3, +dag=dag) + + +file_CRM_contact_part1 = SSHOperator( +ssh_hook=sshHook, +task_id='file_CRM_contact_part1', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"CRM_contact_part1"}, +depends_on_past=False, +retries=3, +dag=dag) + + +file_CCP_mapping_table = SSHOperator( +ssh_hook=sshHook, +task_id='file_CCP_mapping_table', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"CCP_mapping_table"}, +depends_on_past=False, +retries=3, +dag=dag) + + +crm_account_4545 = SSHOperator( +ssh_hook=sshHook, +task_id='crm_account_4545', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_crm_account"}, +depends_on_past=False, +retries=3, +dag=dag) + +crm_contact_ccp_5681 = SSHOperator( +ssh_hook=sshHook, +task_id='crm_contact_ccp_5681', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_crm_contact_ccp"}, +depends_on_past=False, +retries=3, +dag=dag) + +ccp_mapping_table_8972 = SSHOperator( +ssh_hook=sshHook, +task_id='ccp_mapping_table_8972', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_ccp_mapping_table"}, +depends_on_past=False, +retries=3, +dag=dag) + +crm_contact_part_605 = SSHOperator( +ssh_hook=sshHook, +task_id='crm_contact_part_605', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_crm_contact_part"}, +depends_on_past=False, +retries=3, +dag=dag) + +file_country_cde = SSHOperator( +ssh_hook=sshHook, +task_id='file_country_cde', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"country_cde"}, +depends_on_past=False, +retries=3, +dag=dag) + + +country_cde_3310 = SSHOperator( +ssh_hook=sshHook, +task_id='country_cde_3310', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_country_cde"}, +depends_on_past=False, +retries=3, +dag=dag) + +file_china_city = SSHOperator( +ssh_hook=sshHook, +task_id='file_china_city', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"china_city"}, +depends_on_past=False, +retries=3, +dag=dag) + + +t01_crm_contact = SSHOperator( +ssh_hook=sshHook, +task_id='t01_crm_contact', +command='/data/airflow/etl/PDM/run_sa.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"t01_crm_contact_agi"}, +depends_on_past=False, +retries=3, +dag=dag) +file_CRM_account >> crm_account_4545 +file_CRM_contact_ccp >> crm_contact_ccp_5681 +file_CCP_mapping_table >> ccp_mapping_table_8972 +file_CRM_contact_part1 >> crm_contact_part_605 +file_country_cde >> country_cde_3310 +crm_contact_ccp_5681 >> t01_crm_contact +t01_crm_contact >> task_failed