add workflow 泰克CRM,dev
This commit is contained in:
		
							parent
							
								
									ba50c1b124
								
							
						
					
					
						commit
						d332599034
					
				|  | @ -0,0 +1,50 @@ | ||||||
|  | /*******Main Section**************************************************************************/ | ||||||
|  | \set ON_ERROR_STOP on | ||||||
|  | \set AUTOCOMMIT on | ||||||
|  | \timing on | ||||||
|  | delete from p10_sa.S98_S_country_cde | ||||||
|  | ; | ||||||
|  | insert into p10_sa.S98_S_country_cde | ||||||
|  | (	 country_cn_name   | ||||||
|  | 	, country_en_name   | ||||||
|  | 	, country_cd   | ||||||
|  | 	, country_abbr   | ||||||
|  | 	, country_number   | ||||||
|  | 	, inter_tel_cd   | ||||||
|  | 	, internet_name   | ||||||
|  | 	, etl_tx_dt   ) | ||||||
|  |   select  | ||||||
|  | 	 country_cn_name   | ||||||
|  | 	, country_en_name   | ||||||
|  | 	, country_cd   | ||||||
|  | 	, country_abbr   | ||||||
|  | 	, country_number   | ||||||
|  | 	, inter_tel_cd   | ||||||
|  | 	, internet_name   | ||||||
|  | 	, etl_tx_dt   | ||||||
|  |   from p00_tal.S98_S_country_cde | ||||||
|  |   ; | ||||||
|  |   delete from p12_sfull.S98_S_country_cde | ||||||
|  | ; | ||||||
|  | ; | ||||||
|  | insert into p12_sfull.S98_S_country_cde | ||||||
|  | (	 country_cn_name   | ||||||
|  | 	, country_en_name   | ||||||
|  | 	, country_cd   | ||||||
|  | 	, country_abbr   | ||||||
|  | 	, country_number   | ||||||
|  | 	, inter_tel_cd   | ||||||
|  | 	, internet_name   | ||||||
|  | 	, etl_tx_dt   ) | ||||||
|  |   select  | ||||||
|  | 	 country_cn_name   | ||||||
|  | 	, country_en_name   | ||||||
|  | 	, country_cd   | ||||||
|  | 	, country_abbr   | ||||||
|  | 	, country_number   | ||||||
|  | 	, inter_tel_cd   | ||||||
|  | 	, internet_name   | ||||||
|  | 	, etl_tx_dt   | ||||||
|  |   from p10_sa.S98_S_country_cde | ||||||
|  | ; | ||||||
|  | \q | ||||||
|  | @ -0,0 +1,19 @@ | ||||||
|  | 
 | ||||||
|  | CREATE FOREIGN TABLE if not exists p00_tal.S98_S_country_cde ( | ||||||
|  |  country_cn_name  TEXT  | ||||||
|  | 	, country_en_name  TEXT  | ||||||
|  | 	, country_cd  TEXT  | ||||||
|  | 	, country_abbr  TEXT  | ||||||
|  | 	, country_number  TEXT  | ||||||
|  | 	, inter_tel_cd  TEXT  | ||||||
|  | 	, internet_name  TEXT  | ||||||
|  | 	, etl_tx_dt  TIMESTAMP  | ||||||
|  | )  | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'country_cde' );  | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @ -0,0 +1,47 @@ | ||||||
|  | 
 | ||||||
|  | create table if not exists p10_sa.S98_S_country_cde ( | ||||||
|  | 	 country_cn_name  TEXT  | ||||||
|  | 	, country_en_name  TEXT  | ||||||
|  | 	, country_cd  TEXT  | ||||||
|  | 	, country_abbr  TEXT  | ||||||
|  | 	, country_number  TEXT  | ||||||
|  | 	, inter_tel_cd  TEXT  | ||||||
|  | 	, internet_name  TEXT  | ||||||
|  | 	, etl_tx_dt  TIMESTAMP  | ||||||
|  | ) ;  | ||||||
|  | 
 | ||||||
|  |  COMMENT ON COLUMN p10_sa.S98_S_country_cde.country_cn_name IS '国家';  | ||||||
|  |  COMMENT ON COLUMN p10_sa.S98_S_country_cde.country_en_name IS '国家英文名';  | ||||||
|  |  COMMENT ON COLUMN p10_sa.S98_S_country_cde.country_cd IS '国家代码';  | ||||||
|  |  COMMENT ON COLUMN p10_sa.S98_S_country_cde.country_abbr IS '国家缩写';  | ||||||
|  |  COMMENT ON COLUMN p10_sa.S98_S_country_cde.country_number IS '数字代码';  | ||||||
|  |  COMMENT ON COLUMN p10_sa.S98_S_country_cde.inter_tel_cd IS '国际区号';  | ||||||
|  |  COMMENT ON COLUMN p10_sa.S98_S_country_cde.internet_name IS '域名';  | ||||||
|  |  COMMENT ON COLUMN p10_sa.S98_S_country_cde.etl_tx_dt IS '';  | ||||||
|  | 
 | ||||||
|  | COMMENT ON TABLE p10_sa.S98_S_country_cde IS ''; | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | create table if not exists p12_sfull.S98_S_country_cde ( | ||||||
|  | 	 country_cn_name  TEXT  | ||||||
|  | 	, country_en_name  TEXT  | ||||||
|  | 	, country_cd  TEXT  | ||||||
|  | 	, country_abbr  TEXT  | ||||||
|  | 	, country_number  TEXT  | ||||||
|  | 	, inter_tel_cd  TEXT  | ||||||
|  | 	, internet_name  TEXT  | ||||||
|  | 	, etl_tx_dt  TIMESTAMP  | ||||||
|  | ) ;  | ||||||
|  | 
 | ||||||
|  |  COMMENT ON COLUMN p12_sfull.S98_S_country_cde.country_cn_name IS '国家';  | ||||||
|  |  COMMENT ON COLUMN p12_sfull.S98_S_country_cde.country_en_name IS '国家英文名';  | ||||||
|  |  COMMENT ON COLUMN p12_sfull.S98_S_country_cde.country_cd IS '国家代码';  | ||||||
|  |  COMMENT ON COLUMN p12_sfull.S98_S_country_cde.country_abbr IS '国家缩写';  | ||||||
|  |  COMMENT ON COLUMN p12_sfull.S98_S_country_cde.country_number IS '数字代码';  | ||||||
|  |  COMMENT ON COLUMN p12_sfull.S98_S_country_cde.inter_tel_cd IS '国际区号';  | ||||||
|  |  COMMENT ON COLUMN p12_sfull.S98_S_country_cde.internet_name IS '域名';  | ||||||
|  |  COMMENT ON COLUMN p12_sfull.S98_S_country_cde.etl_tx_dt IS '';  | ||||||
|  | 
 | ||||||
|  | COMMENT ON TABLE p12_sfull.S98_S_country_cde IS ''; | ||||||
|  | 
 | ||||||
|  | @ -0,0 +1,151 @@ | ||||||
|  | #!/usr/bin/python | ||||||
|  | # -*- encoding=utf-8 -*- | ||||||
|  | from airflow import DAG | ||||||
|  | from datetime import datetime, timedelta | ||||||
|  | from airflow.contrib.hooks.ssh_hook import SSHHook | ||||||
|  | from airflow.contrib.operators.ssh_operator import SSHOperator | ||||||
|  | from airflow.sensors.external_task_sensor import ExternalTaskSensor | ||||||
|  | import json | ||||||
|  | 
 | ||||||
|  | from airflow.operators.email_operator import EmailOperator | ||||||
|  | from airflow.utils.trigger_rule import TriggerRule | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | sshHook = SSHHook(ssh_conn_id ='ssh_air') | ||||||
|  | default_args = { | ||||||
|  | 'owner': 'info@idgvalue.com', | ||||||
|  | 'email': [''], | ||||||
|  | 'email_on_failure': True, | ||||||
|  | 'email_on_retry':True, | ||||||
|  | 'start_date': datetime(2022, 9, 12), | ||||||
|  | 'depends_on_past': False, | ||||||
|  | 'retries': 6, | ||||||
|  | 'retry_delay': timedelta(minutes=10), | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | dag = DAG('wf_dag_tk_crm', default_args=default_args, | ||||||
|  | schedule_interval="0 0 * * *", | ||||||
|  | catchup=False, | ||||||
|  | dagrun_timeout=timedelta(minutes=160), | ||||||
|  | max_active_runs=3) | ||||||
|  | 
 | ||||||
|  | task_failed = EmailOperator ( | ||||||
|  |     dag=dag, | ||||||
|  |     trigger_rule=TriggerRule.ONE_FAILED, | ||||||
|  |     task_id="task_failed", | ||||||
|  |     to=["info@idgvalue.com"], | ||||||
|  |     cc=[""], | ||||||
|  |     subject="tk_crm_failed", | ||||||
|  |     html_content='<h3>您好,tk_crm作业失败,请及时处理" </h3>') | ||||||
|  | 
 | ||||||
|  | file_CRM_account = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='file_CRM_account', | ||||||
|  | command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }}  >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"CRM_account"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | file_CRM_contact_ccp = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='file_CRM_contact_ccp', | ||||||
|  | command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }}  >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"CRM_contact_ccp"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | file_CRM_contact_part1 = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='file_CRM_contact_part1', | ||||||
|  | command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }}  >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"CRM_contact_part1"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | file_CCP_mapping_table = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='file_CCP_mapping_table', | ||||||
|  | command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }}  >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"CCP_mapping_table"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | crm_account_4545 = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='crm_account_4545', | ||||||
|  | command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"S98_S_crm_account"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | crm_contact_ccp_5681 = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='crm_contact_ccp_5681', | ||||||
|  | command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"S98_S_crm_contact_ccp"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | ccp_mapping_table_8972 = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='ccp_mapping_table_8972', | ||||||
|  | command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"S98_S_ccp_mapping_table"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | crm_contact_part_605 = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='crm_contact_part_605', | ||||||
|  | command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"S98_S_crm_contact_part"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | file_country_cde = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='file_country_cde', | ||||||
|  | command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }}  >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"country_cde"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | country_cde_3310 = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='country_cde_3310', | ||||||
|  | command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"S98_S_country_cde"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | file_china_city = SSHOperator( | ||||||
|  | ssh_hook=sshHook, | ||||||
|  | task_id='file_china_city', | ||||||
|  | command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }}  >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', | ||||||
|  | params={'my_param':"china_city"}, | ||||||
|  | depends_on_past=False,  | ||||||
|  | retries=3,  | ||||||
|  | dag=dag) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | file_CRM_account >> crm_account_4545 | ||||||
|  | file_CRM_contact_ccp >> crm_contact_ccp_5681 | ||||||
|  | file_CCP_mapping_table >> ccp_mapping_table_8972 | ||||||
|  | file_CRM_contact_part1 >> crm_contact_part_605 | ||||||
|  | file_country_cde >> country_cde_3310 | ||||||
|  | country_cde_3310 >> task_failed  | ||||||
		Loading…
	
		Reference in New Issue