diff --git a/TK_Cust/dev/tk_crm/country_cde/S98_S_country_cde.sql b/TK_Cust/dev/tk_crm/country_cde/S98_S_country_cde.sql new file mode 100644 index 0000000..3249aa6 --- /dev/null +++ b/TK_Cust/dev/tk_crm/country_cde/S98_S_country_cde.sql @@ -0,0 +1,50 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on +delete from p10_sa.S98_S_country_cde +; +insert into p10_sa.S98_S_country_cde +( country_cn_name + , country_en_name + , country_cd + , country_abbr + , country_number + , inter_tel_cd + , internet_name + , etl_tx_dt ) + select + country_cn_name + , country_en_name + , country_cd + , country_abbr + , country_number + , inter_tel_cd + , internet_name + , etl_tx_dt + from p00_tal.S98_S_country_cde + ; + delete from p12_sfull.S98_S_country_cde +; +; +insert into p12_sfull.S98_S_country_cde +( country_cn_name + , country_en_name + , country_cd + , country_abbr + , country_number + , inter_tel_cd + , internet_name + , etl_tx_dt ) + select + country_cn_name + , country_en_name + , country_cd + , country_abbr + , country_number + , inter_tel_cd + , internet_name + , etl_tx_dt + from p10_sa.S98_S_country_cde +; +\q \ No newline at end of file diff --git a/TK_Cust/dev/tk_crm/country_cde/sa_foreign_tables.sql b/TK_Cust/dev/tk_crm/country_cde/sa_foreign_tables.sql new file mode 100644 index 0000000..6bb22dc --- /dev/null +++ b/TK_Cust/dev/tk_crm/country_cde/sa_foreign_tables.sql @@ -0,0 +1,19 @@ + +CREATE FOREIGN TABLE if not exists p00_tal.S98_S_country_cde ( + country_cn_name TEXT + , country_en_name TEXT + , country_cd TEXT + , country_abbr TEXT + , country_number TEXT + , inter_tel_cd TEXT + , internet_name TEXT + , etl_tx_dt TIMESTAMP +) + + +SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'country_cde' ); + + + + + diff --git a/TK_Cust/dev/tk_crm/country_cde/sa_tables.sql b/TK_Cust/dev/tk_crm/country_cde/sa_tables.sql new file mode 100644 index 0000000..f05cfa4 --- /dev/null +++ b/TK_Cust/dev/tk_crm/country_cde/sa_tables.sql @@ -0,0 +1,47 @@ + +create table if not exists p10_sa.S98_S_country_cde ( + country_cn_name TEXT + , country_en_name TEXT + , country_cd TEXT + , country_abbr TEXT + , country_number TEXT + , inter_tel_cd TEXT + , internet_name TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p10_sa.S98_S_country_cde.country_cn_name IS '国家'; + COMMENT ON COLUMN p10_sa.S98_S_country_cde.country_en_name IS '国家英文名'; + COMMENT ON COLUMN p10_sa.S98_S_country_cde.country_cd IS '国家代码'; + COMMENT ON COLUMN p10_sa.S98_S_country_cde.country_abbr IS '国家缩写'; + COMMENT ON COLUMN p10_sa.S98_S_country_cde.country_number IS '数字代码'; + COMMENT ON COLUMN p10_sa.S98_S_country_cde.inter_tel_cd IS '国际区号'; + COMMENT ON COLUMN p10_sa.S98_S_country_cde.internet_name IS '域名'; + COMMENT ON COLUMN p10_sa.S98_S_country_cde.etl_tx_dt IS ''; + +COMMENT ON TABLE p10_sa.S98_S_country_cde IS ''; + + + +create table if not exists p12_sfull.S98_S_country_cde ( + country_cn_name TEXT + , country_en_name TEXT + , country_cd TEXT + , country_abbr TEXT + , country_number TEXT + , inter_tel_cd TEXT + , internet_name TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p12_sfull.S98_S_country_cde.country_cn_name IS '国家'; + COMMENT ON COLUMN p12_sfull.S98_S_country_cde.country_en_name IS '国家英文名'; + COMMENT ON COLUMN p12_sfull.S98_S_country_cde.country_cd IS '国家代码'; + COMMENT ON COLUMN p12_sfull.S98_S_country_cde.country_abbr IS '国家缩写'; + COMMENT ON COLUMN p12_sfull.S98_S_country_cde.country_number IS '数字代码'; + COMMENT ON COLUMN p12_sfull.S98_S_country_cde.inter_tel_cd IS '国际区号'; + COMMENT ON COLUMN p12_sfull.S98_S_country_cde.internet_name IS '域名'; + COMMENT ON COLUMN p12_sfull.S98_S_country_cde.etl_tx_dt IS ''; + +COMMENT ON TABLE p12_sfull.S98_S_country_cde IS ''; + diff --git a/TK_Cust/dev/tk_crm/泰克CRM/wf_dag_tk_crm.py b/TK_Cust/dev/tk_crm/泰克CRM/wf_dag_tk_crm.py new file mode 100644 index 0000000..e86e4d8 --- /dev/null +++ b/TK_Cust/dev/tk_crm/泰克CRM/wf_dag_tk_crm.py @@ -0,0 +1,151 @@ +#!/usr/bin/python +# -*- encoding=utf-8 -*- +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.contrib.operators.ssh_operator import SSHOperator +from airflow.sensors.external_task_sensor import ExternalTaskSensor +import json + +from airflow.operators.email_operator import EmailOperator +from airflow.utils.trigger_rule import TriggerRule + + +sshHook = SSHHook(ssh_conn_id ='ssh_air') +default_args = { +'owner': 'info@idgvalue.com', +'email': [''], +'email_on_failure': True, +'email_on_retry':True, +'start_date': datetime(2022, 9, 12), +'depends_on_past': False, +'retries': 6, +'retry_delay': timedelta(minutes=10), +} + +dag = DAG('wf_dag_tk_crm', default_args=default_args, +schedule_interval="0 0 * * *", +catchup=False, +dagrun_timeout=timedelta(minutes=160), +max_active_runs=3) + +task_failed = EmailOperator ( + dag=dag, + trigger_rule=TriggerRule.ONE_FAILED, + task_id="task_failed", + to=["info@idgvalue.com"], + cc=[""], + subject="tk_crm_failed", + html_content='

您好,tk_crm作业失败,请及时处理"

') + +file_CRM_account = SSHOperator( +ssh_hook=sshHook, +task_id='file_CRM_account', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"CRM_account"}, +depends_on_past=False, +retries=3, +dag=dag) + + +file_CRM_contact_ccp = SSHOperator( +ssh_hook=sshHook, +task_id='file_CRM_contact_ccp', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"CRM_contact_ccp"}, +depends_on_past=False, +retries=3, +dag=dag) + + +file_CRM_contact_part1 = SSHOperator( +ssh_hook=sshHook, +task_id='file_CRM_contact_part1', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"CRM_contact_part1"}, +depends_on_past=False, +retries=3, +dag=dag) + + +file_CCP_mapping_table = SSHOperator( +ssh_hook=sshHook, +task_id='file_CCP_mapping_table', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"CCP_mapping_table"}, +depends_on_past=False, +retries=3, +dag=dag) + + +crm_account_4545 = SSHOperator( +ssh_hook=sshHook, +task_id='crm_account_4545', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_crm_account"}, +depends_on_past=False, +retries=3, +dag=dag) + +crm_contact_ccp_5681 = SSHOperator( +ssh_hook=sshHook, +task_id='crm_contact_ccp_5681', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_crm_contact_ccp"}, +depends_on_past=False, +retries=3, +dag=dag) + +ccp_mapping_table_8972 = SSHOperator( +ssh_hook=sshHook, +task_id='ccp_mapping_table_8972', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_ccp_mapping_table"}, +depends_on_past=False, +retries=3, +dag=dag) + +crm_contact_part_605 = SSHOperator( +ssh_hook=sshHook, +task_id='crm_contact_part_605', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_crm_contact_part"}, +depends_on_past=False, +retries=3, +dag=dag) + +file_country_cde = SSHOperator( +ssh_hook=sshHook, +task_id='file_country_cde', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"country_cde"}, +depends_on_past=False, +retries=3, +dag=dag) + + +country_cde_3310 = SSHOperator( +ssh_hook=sshHook, +task_id='country_cde_3310', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_country_cde"}, +depends_on_past=False, +retries=3, +dag=dag) + +file_china_city = SSHOperator( +ssh_hook=sshHook, +task_id='file_china_city', +command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} >>/data/airflow/logs/file_load/file_load_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"china_city"}, +depends_on_past=False, +retries=3, +dag=dag) + + +file_CRM_account >> crm_account_4545 +file_CRM_contact_ccp >> crm_contact_ccp_5681 +file_CCP_mapping_table >> ccp_mapping_table_8972 +file_CRM_contact_part1 >> crm_contact_part_605 +file_country_cde >> country_cde_3310 +country_cde_3310 >> task_failed