add workflow 泰克客户,dev

This commit is contained in:
root 2024-04-29 14:46:22 +08:00
parent 5228669c88
commit aa42b44d58
3 changed files with 309 additions and 10 deletions

View File

@ -0,0 +1,30 @@
DROP TABLE IF EXISTS p30_common.d_livechat_contact;
CREATE TABLE IF NOT EXISTS p30_common.d_livechat_contact (
livechat_leads_id VARCHAR(20)
, full_name VARCHAR(0)
, mobile_phone VARCHAR(0)
, mobile_phone_data_ind text
, mobile_phone_availability text
, email VARCHAR(0)
, email_data_ind text
, email_availability text
, company_name VARCHAR(0)
, visitor_country_code VARCHAR(2)
, pre_chat_private_policy VARCHAR(10)
);
COMMENT ON COLUMN p30_common.d_livechat_contact.livechat_leads_id IS '';
COMMENT ON COLUMN p30_common.d_livechat_contact.full_name IS '';
COMMENT ON COLUMN p30_common.d_livechat_contact.mobile_phone IS '';
COMMENT ON COLUMN p30_common.d_livechat_contact.mobile_phone_data_ind IS '';
COMMENT ON COLUMN p30_common.d_livechat_contact.mobile_phone_availability IS '';
COMMENT ON COLUMN p30_common.d_livechat_contact.email IS '';
COMMENT ON COLUMN p30_common.d_livechat_contact.email_data_ind IS '';
COMMENT ON COLUMN p30_common.d_livechat_contact.email_availability IS '';
COMMENT ON COLUMN p30_common.d_livechat_contact.company_name IS '';
COMMENT ON COLUMN p30_common.d_livechat_contact.visitor_country_code IS '';
COMMENT ON COLUMN p30_common.d_livechat_contact.pre_chat_private_policy IS '';
COMMENT ON TABLE p30_common.d_livechat_contact IS '';

View File

@ -0,0 +1,269 @@
/***************************************************************************************************/
/*script in Sql, generate by SdmCreateScript 2020(by Qihang Feng, QF255001@TERADATA.COM) */
/*VERSION 01.10 revised on 2020-08-25 */
/*Brilliance stems from wisdoms. */
/*************Head Section**************************************************************************/
/*Script Use: Periodically load data to :d_livechat_contact(Livechat联系信息) */
/*Create Date:2024-04-29 14:45:07 */
/*SDM Developed By: dev */
/*SDM Developed Date: 2024-01-19 */
/*SDM Checked By: dev */
/*SDM Checked Date: 2024-04-29 */
/*Script Developed By: dev */
/*Script Checked By: dev */
/*Source table 1: p20_pdm.t01_livechat_record */
/*Job Type: Inbound transform (Tier 1 to Tier 2) */
/*Target Table:d_livechat_contact */
/*ETL Job Name:d_livechat_contact */
/*ETL Frequency:Daily */
/*ETL Policy:F2 */
/********************************************************************************************/
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
/*创建临时表加载当前数据 */
CREATE TEMPORARY TABLE d_livechat_contact_agi_CUR_I
( LIKE :COMMDB.d_livechat_contact)
ON COMMIT PRESERVE ROWS;
/*创建临时表加载不同数据 */
CREATE TEMPORARY TABLE d_livechat_contact_agi_INS
( LIKE :COMMDB.d_livechat_contact)
ON COMMIT PRESERVE ROWS;
/*****************************************************************************************************/
/* GROUP 1:Source Table:t01_livechat_record***********************************************************/
/*****************************************************************************************************/
INSERT INTO d_livechat_contact_agi_CUR_I (
livechat_leads_id /*Livechat线索编号*/
,full_name /*姓名*/
,mobile_phone /*手机*/
,mobile_phone_data_ind /*手机数据标志*/
,mobile_phone_availability /*手机合规标志*/
,email /*邮箱*/
,email_data_ind /*邮箱数据标志*/
,email_availability /*邮箱合规标志*/
,company_name /*公司*/
,visitor_country_code /*访客国家代码*/
,pre_chat_private_policy /*隐私政策同意标志*/
,create_time /*创建时间*/
,etl_batch_no /*作业批次号*/
,etl_first_dt /*最初入库时间*/
,etl_job /*作业名称*/
,etl_proc_dt /*本次入库时间*/
,etl_tx_dt /*作业运行时间*/
,src_sysname /*来源系统*/
,src_table /*来源表*/
)
SELECT
COALESCE(TRIM(p0.livechat_leads_id),'') /*livechat_leads_id*/
,case when trim(pre_chat_name)<>'' then pre_chat_name
when trim(pre_chat_name_j) <>'' then pre_chat_name_j
when trim(pre_chat_name_k) <>'' then pre_chat_name_k
when trim(visitor_nick) <>'' then visitor_nick
else '' end full_name /*full_name*/
,case when length(pre_chat_contact) >10 then pre_chat_contact
when length(pre_chat_contact_t) >10 then pre_chat_contact_t
when length(pre_chat_contact_k) >10 then pre_chat_contact_k
when length(pre_chat_contact_2) >10 then pre_chat_contact_2
when length(pre_chat_contact_3) >10 then pre_chat_contact_3
else '' end mobile_phone /*mobile_phone*/
,case when length(pre_chat_contact) >1 then '1'
when length(pre_chat_contact_t) >1 then '1'
when length(pre_chat_contact_k) >1 then '1'
when length(pre_chat_contact_2) >1 then '1'
when length(pre_chat_contact_3) >1 then '1'
else '0' end mobile_phone_data_ind /*mobile_phone_data_ind*/
,case when pre_chat_contact ~ '^1[3-9]\d{9}$' then '1'
when pre_chat_contact_t ~ '^1[3-9]\d{9}$' then '1'
when pre_chat_contact_k ~ '^1[3-9]\d{9}$' then '1'
when pre_chat_contact_2 ~ '^1[3-9]\d{9}$' then '1'
when pre_chat_contact_3 ~ '^1[3-9]\d{9}$' then '1'
else '0' end mobile_phone_availability /*mobile_phone_availability*/
,case when pre_chat_email ~ '[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,4}$' then pre_chat_email
when pre_chat_email_t ~ '[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,4}$' then pre_chat_email_t
when pre_chat_email_k ~ '[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,4}$' then pre_chat_email_k
when pre_chat_email_j ~ '[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,4}$' then pre_chat_email_j
else '' end email /*email*/
,case when length(pre_chat_email) >1 then '1'
when length(pre_chat_email_t) >1 then '1'
when length(pre_chat_email_k) >1 then '1'
when length(pre_chat_email_j) >1 then '1'
else '0' end email_data_ind /*email_data_ind*/
,case when pre_chat_email ~ '[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,4}$' then '1'
when pre_chat_email_t ~ '[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,4}$' then '1'
when pre_chat_email_k ~ '[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,4}$' then '1'
when pre_chat_email_j ~ '[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,4}$' then '1'
else '0' end email_availability /*email_availability*/
,case when trim(pre_chat_company) <>'' then pre_chat_company
when trim(pre_chat_company_t) <>'' then pre_chat_company_t
when trim(pre_chat_company_k) <>'' then pre_chat_company_k
else '' end company_name /*company_name*/
,COALESCE(TRIM(p0.visitor_country_code),'') /*visitor_country_code*/
,COALESCE(TRIM(p0.pre_chat_private_policy),'') /*pre_chat_private_policy*/
,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD') /*create_time*/
,0 /*etl_batch_no*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*etl_first_dt*/
,:ETLJOB /*etl_job*/
,current_timestamp(0) /*etl_proc_dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*etl_tx_dt*/
,Substr('t01_livechat_record',1,3) /*src_sysname*/
,'t01_livechat_record' /*src_table*/
FROM p20_pdm.t01_livechat_record p0
;
/*将不同数据插入到临时表 */
;INSERT INTO d_livechat_contact_agi_INS (
full_name /*姓名*/
,mobile_phone /*手机*/
,mobile_phone_data_ind /*手机数据标志*/
,mobile_phone_availability /*手机合规标志*/
,email /*邮箱*/
,email_data_ind /*邮箱数据标志*/
,email_availability /*邮箱合规标志*/
,company_name /*公司*/
,visitor_country_code /*访客国家代码*/
,pre_chat_private_policy /*隐私政策同意标志*/
,create_time /*创建时间*/
,livechat_leads_id /*Livechat线索编号*/
,etl_batch_no /*作业批次号*/
,etl_first_dt /*最初入库时间*/
,etl_job /*作业名称*/
,etl_proc_dt /*本次入库时间*/
,etl_tx_dt /*作业运行时间*/
,src_sysname /*来源系统*/
,src_table /*来源表*/
)
SELECT
P1.full_name /*姓名*/
,P1.mobile_phone /*手机*/
,P1.mobile_phone_data_ind /*手机数据标志*/
,P1.mobile_phone_availability /*手机合规标志*/
,P1.email /*邮箱*/
,P1.email_data_ind /*邮箱数据标志*/
,P1.email_availability /*邮箱合规标志*/
,P1.company_name /*公司*/
,P1.visitor_country_code /*访客国家代码*/
,P1.pre_chat_private_policy /*隐私政策同意标志*/
,P1.create_time /*创建时间*/
,P1.livechat_leads_id /*Livechat线索编号*/
,P1.etl_batch_no /*作业批次号*/
,P1.etl_first_dt /*最初入库时间*/
,P1.etl_job /*作业名称*/
,P1.etl_proc_dt /*本次入库时间*/
,P1.etl_tx_dt /*作业运行时间*/
,P1.src_sysname /*来源系统*/
,P1.src_table /*来源表*/
FROM d_livechat_contact_agi_CUR_I P1
LEFT JOIN :COMMDB.d_livechat_contact P2
ON P1.full_name = P2.full_name
AND P1.mobile_phone = P2.mobile_phone
AND P1.mobile_phone_data_ind = P2.mobile_phone_data_ind
AND P1.mobile_phone_availability = P2.mobile_phone_availability
AND P1.email = P2.email
AND P1.email_data_ind = P2.email_data_ind
AND P1.email_availability = P2.email_availability
AND P1.company_name = P2.company_name
AND P1.visitor_country_code = P2.visitor_country_code
AND P1.pre_chat_private_policy = P2.pre_chat_private_policy
AND P1.create_time = P2.create_time
AND P1.livechat_leads_id = P2.livechat_leads_id
WHERE P2.full_name IS NULL
OR P2.mobile_phone IS NULL
OR P2.mobile_phone_data_ind IS NULL
OR P2.mobile_phone_availability IS NULL
OR P2.email IS NULL
OR P2.email_data_ind IS NULL
OR P2.email_availability IS NULL
OR P2.company_name IS NULL
OR P2.visitor_country_code IS NULL
OR P2.pre_chat_private_policy IS NULL
OR P2.create_time IS NULL
OR P2.livechat_leads_id IS NULL
;
/*将新增数据插入到目标表 */
;INSERT INTO :COMMDB.d_livechat_contact (
full_name /*姓名*/
,mobile_phone /*手机*/
,mobile_phone_data_ind /*手机数据标志*/
,mobile_phone_availability /*手机合规标志*/
,email /*邮箱*/
,email_data_ind /*邮箱数据标志*/
,email_availability /*邮箱合规标志*/
,company_name /*公司*/
,visitor_country_code /*访客国家代码*/
,pre_chat_private_policy /*隐私政策同意标志*/
,create_time /*创建时间*/
,livechat_leads_id /*Livechat线索编号*/
,etl_batch_no /*作业批次号*/
,etl_first_dt /*最初入库时间*/
,etl_job /*作业名称*/
,etl_proc_dt /*本次入库时间*/
,etl_tx_dt /*作业运行时间*/
,src_sysname /*来源系统*/
,src_table /*来源表*/
)
SELECT
P1.full_name /*姓名*/
,P1.mobile_phone /*手机*/
,P1.mobile_phone_data_ind /*手机数据标志*/
,P1.mobile_phone_availability /*手机合规标志*/
,P1.email /*邮箱*/
,P1.email_data_ind /*邮箱数据标志*/
,P1.email_availability /*邮箱合规标志*/
,P1.company_name /*公司*/
,P1.visitor_country_code /*访客国家代码*/
,P1.pre_chat_private_policy /*隐私政策同意标志*/
,P1.create_time /*创建时间*/
,P1.livechat_leads_id /*Livechat线索编号*/
,P1.etl_batch_no /*作业批次号*/
,P1.etl_first_dt /*最初入库时间*/
,P1.etl_job /*作业名称*/
,P1.etl_proc_dt /*本次入库时间*/
,P1.etl_tx_dt /*作业运行时间*/
,P1.src_sysname /*来源系统*/
,P1.src_table /*来源表*/
FROM d_livechat_contact_agi_INS P1
ON CONFLICT ( livechat_leads_id)
DO UPDATE SET
livechat_leads_id=excluded.livechat_leads_id
,full_name=excluded.full_name
,mobile_phone=excluded.mobile_phone
,mobile_phone_data_ind=excluded.mobile_phone_data_ind
,mobile_phone_availability=excluded.mobile_phone_availability
,email=excluded.email
,email_data_ind=excluded.email_data_ind
,email_availability=excluded.email_availability
,company_name=excluded.company_name
,visitor_country_code=excluded.visitor_country_code
,pre_chat_private_policy=excluded.pre_chat_private_policy
,create_time=excluded.create_time
,etl_batch_no=excluded.etl_batch_no
,etl_first_dt=excluded.etl_first_dt
,etl_job=excluded.etl_job
,etl_proc_dt=excluded.etl_proc_dt
,etl_tx_dt=excluded.etl_tx_dt
,src_sysname=excluded.src_sysname
,src_table=excluded.src_table
;
/*****程序结束退出 */
\q

View File

@ -172,14 +172,6 @@ params={'my_param':"t01_udesk_record_agi"},
depends_on_past=False,
retries=3,
dag=dag)
d_crm_contact = SSHOperator(
ssh_hook=sshHook,
task_id='d_crm_contact',
command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"d_crm_contact_agi"},
depends_on_past=False,
retries=3,
dag=dag)
d_udesk_contact = SSHOperator(
ssh_hook=sshHook,
task_id='d_udesk_contact',
@ -188,6 +180,14 @@ params={'my_param':"d_udesk_contact_agi"},
depends_on_past=False,
retries=3,
dag=dag)
d_livechat_contact = SSHOperator(
ssh_hook=sshHook,
task_id='d_livechat_contact',
command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"d_livechat_contact_agi"},
depends_on_past=False,
retries=3,
dag=dag)
file_Tk01 >> livechat_6381
file_Udesk_record >> udesk_record_3768
file_CRM_Raw_Leads >> crm_raw_leads_6024
@ -198,6 +198,6 @@ crm_raw_leads_6024 >> t01_crm_raw_leads
country_cde_3310 >> t00_country_info
china_city_4536 >> t00_china_city_info
udesk_record_3768 >> t01_udesk_record
t01_livechat_record >> d_crm_contact
t01_udesk_record >> d_udesk_contact
d_udesk_contact >> task_failed
t01_livechat_record >> d_livechat_contact
d_livechat_contact >> task_failed