add workflow 荟聚API_2,dev

This commit is contained in:
root 2024-05-06 14:09:07 +08:00
parent aa42b44d58
commit 25c009c0b7
3 changed files with 389 additions and 5 deletions

View File

@ -0,0 +1,50 @@
DROP TABLE IF EXISTS p30_common.d_scrm_contact;
CREATE TABLE IF NOT EXISTS p30_common.d_scrm_contact (
scrm_leads_id VARCHAR(20)
, city_name VARCHAR(20)
, company_name VARCHAR(50)
, email VARCHAR(50)
, email_data_ind char(1)
, email_availability char(1)
, mobile_number VARCHAR(20)
, mobile_phone_data_ind char(1)
, mobile_phone_availability char(1)
, full_name VARCHAR(20)
, prov_name VARCHAR(20)
, create_time timestamp(0)
, wechat_id VARCHAR(100)
, update_time timestamp(0)
, Etl_Batch_No varchar(50)
, Etl_First_Dt timestamp(0)
, Etl_Job varchar(50)
, Etl_Proc_Dt timestamp(0)
, Etl_Tx_Dt timestamp(0)
, Src_Sysname varchar(50)
, Src_Table varchar(50)
,primary key( scrm_leads_id )
);
COMMENT ON COLUMN p30_common.d_scrm_contact.scrm_leads_id IS 'SCRM线索ID';
COMMENT ON COLUMN p30_common.d_scrm_contact.city_name IS '城市';
COMMENT ON COLUMN p30_common.d_scrm_contact.company_name IS '公司';
COMMENT ON COLUMN p30_common.d_scrm_contact.email IS '邮箱';
COMMENT ON COLUMN p30_common.d_scrm_contact.email_data_ind IS '邮箱数据标志';
COMMENT ON COLUMN p30_common.d_scrm_contact.email_availability IS '邮箱有效性';
COMMENT ON COLUMN p30_common.d_scrm_contact.mobile_number IS '手机号码';
COMMENT ON COLUMN p30_common.d_scrm_contact.mobile_phone_data_ind IS '手机号数据标志';
COMMENT ON COLUMN p30_common.d_scrm_contact.mobile_phone_availability IS '手机号数据有效性';
COMMENT ON COLUMN p30_common.d_scrm_contact.full_name IS '姓名';
COMMENT ON COLUMN p30_common.d_scrm_contact.prov_name IS '省份';
COMMENT ON COLUMN p30_common.d_scrm_contact.create_time IS '创建时间';
COMMENT ON COLUMN p30_common.d_scrm_contact.wechat_id IS '身份-企业微信外部联系人';
COMMENT ON COLUMN p30_common.d_scrm_contact.update_time IS '更新时间';
COMMENT ON COLUMN p30_common.d_scrm_contact.Etl_Batch_No IS '作业批次号';
COMMENT ON COLUMN p30_common.d_scrm_contact.Etl_First_Dt IS '最初入库时间';
COMMENT ON COLUMN p30_common.d_scrm_contact.Etl_Job IS '作业名称';
COMMENT ON COLUMN p30_common.d_scrm_contact.Etl_Proc_Dt IS '本次入库时间';
COMMENT ON COLUMN p30_common.d_scrm_contact.Etl_Tx_Dt IS '作业运行时间';
COMMENT ON COLUMN p30_common.d_scrm_contact.Src_Sysname IS '来源系统';
COMMENT ON COLUMN p30_common.d_scrm_contact.Src_Table IS '来源表';
COMMENT ON TABLE p30_common.d_scrm_contact IS 'SCRM联系方式';

View File

@ -0,0 +1,323 @@
/***************************************************************************************************/
/*script in Sql, generate by SdmCreateScript 2020(by Qihang Feng, QF255001@TERADATA.COM) */
/*VERSION 01.10 revised on 2020-08-25 */
/*Brilliance stems from wisdoms. */
/*************Head Section**************************************************************************/
/*Script Use: Periodically load data to :d_scrm_contact(SCRM联系方式) */
/*Create Date:2024-05-06 14:06:46 */
/*SDM Developed By: dev */
/*SDM Developed Date: 2024-01-18 */
/*SDM Checked By: dev */
/*SDM Checked Date: 2024-05-06 */
/*Script Developed By: dev */
/*Script Checked By: dev */
/*Source table 1: .t01_scrm_contact_his */
/*Source table 2: :SADB.select distinct on (scrm_leads_id) * from p20_pdm.t01_scrm_contact_update tscu
order by scrm_leads_id,update_time desc*/
/*Source table 3: p20_pdm.t01_scrm_contact */
/*Source table 4: :SADB.t01_scrm_contact_merge */
/*Job Type: Inbound transform (Tier 1 to Tier 2) */
/*Target Table:d_scrm_contact */
/*ETL Job Name:d_scrm_contact */
/*ETL Frequency:Daily */
/*ETL Policy:F2 */
/********************************************************************************************/
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
/*创建临时表加载当前数据 */
CREATE TEMPORARY TABLE d_scrm_contact_agi_CUR_I
( LIKE :COMMDB.d_scrm_contact)
ON COMMIT PRESERVE ROWS;
/*创建临时表加载不同数据 */
CREATE TEMPORARY TABLE d_scrm_contact_agi_INS
( LIKE :COMMDB.d_scrm_contact)
ON COMMIT PRESERVE ROWS;
/*****************************************************************************************************/
/* GROUP 1:Source Table:t01_scrm_contact_his**********************************************************/
/*****************************************************************************************************/
INSERT INTO d_scrm_contact_agi_CUR_I (
scrm_leads_id /*SCRM线索ID*/
,city_name /*城市*/
,company_name /*公司*/
,email /*邮箱*/
,email_data_ind /*邮箱数据标志*/
,email_availability /*邮箱有效性*/
,mobile_number /*手机号码*/
,mobile_phone_data_ind /*手机号数据标志*/
,mobile_phone_availability /*手机号数据有效性*/
,full_name /*姓名*/
,prov_name /*省份*/
,create_time /*创建时间*/
,wechat_id /*身份-企业微信外部联系人*/
,update_time /*更新时间*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
)
SELECT
COALESCE(TRIM(p0.scrm_leads_id),'') /*scrm_leads_id*/
,coalesce(p1.city_name,p0.city_name) /*city_name*/
,coalesce(p1.company_name,p0.company_name) /*company_name*/
,coalesce(p1.email,p0.email) /*email*/
,case when length(coalesce(p1.email,p0.email))>1 then '1' else 0 end /*email_data_ind*/
,case when coalesce(p1.email,p0.email) ~ '[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,4}$' and length(coalesce(p1.email,p0.email))>10 then '1'
else '0' end /*email_availability*/
,coalesce(p1.mobile_number,p0.mobile_number,'') /*mobile_number*/
,case when length(coalesce(p1.mobile_number,p0.mobile_number))>1 then '1' else 0 end /*mobile_phone_data_ind*/
,case when coalesce(p1.mobile_number,p0.mobile_number) ~ '^1[3-9]\d{9}$' then '1'
else '0' end /*mobile_phone_availability*/
,coalesce(p1.full_name,p0.full_name,'') /*full_name*/
,coalesce(p1.prov_name,p0.prov_name) /*prov_name*/
,COALESCE(p0.create_time,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*create_time*/
,coalesce(p1.wechat_id,p0.wechat_id) /*wechat_id*/
,COALESCE(p0.update_time,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*update_time*/
,0 /*Etl_Batch_No*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,:ETLJOB /*Etl_Job*/
,current_timestamp(0) /*Etl_Proc_Dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
,Substr('t01_scrm_contact_his',1,3) /*Src_Sysname*/
,'t01_scrm_contact_his' /*Src_Table*/
FROM None.t01_scrm_contact_his p0
LEFT JOIN (select distinct on (scrm_leads_id) * from p20_pdm.t01_scrm_contact_update tscu
order by scrm_leads_id,update_time desc) p1
ON p0 .scrm_leads_id =p1 .scrm_leads_id
;
/*****************************************************************************************************/
/* GROUP 2:Source Table:t01_scrm_contact**************************************************************/
/*****************************************************************************************************/
INSERT INTO d_scrm_contact_agi_CUR_I (
scrm_leads_id /*SCRM线索ID*/
,city_name /*城市*/
,company_name /*公司*/
,email /*邮箱*/
,email_data_ind /*邮箱数据标志*/
,email_availability /*邮箱有效性*/
,mobile_number /*手机号码*/
,mobile_phone_data_ind /*手机号数据标志*/
,mobile_phone_availability /*手机号数据有效性*/
,full_name /*姓名*/
,prov_name /*省份*/
,create_time /*创建时间*/
,wechat_id /*身份-企业微信外部联系人*/
,update_time /*更新时间*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
)
SELECT
COALESCE(TRIM(p0.scrm_leads_id),'') /*scrm_leads_id*/
,COALESCE(TRIM(p0.city_name),'') /*city_name*/
,COALESCE(TRIM(p0.company_name),'') /*company_name*/
,COALESCE(TRIM(p0.email),'') /*email*/
,case when length(p0.email)>1 then '1' else 0 end /*email_data_ind*/
,case when p0.email ~ '[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,4}$' and length(p0.email)>10 then '1'
else '0' end /*email_availability*/
,replace(p0.mobile_number,'.0','') /*mobile_number*/
,case when length(replace(p0.mobile_number,'.0',''))>1 then '1' else 0 end /*mobile_phone_data_ind*/
,case when replace(p0.mobile_number,'.0','') ~ '^1[3-9]\d{9}$' then '1'
else '0' end /*mobile_phone_availability*/
,COALESCE(TRIM(p0.full_name),'') /*full_name*/
,COALESCE(TRIM(p0.prov_name),'') /*prov_name*/
,COALESCE(p0.create_time,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*create_time*/
,COALESCE(TRIM(p0.wechat_id),'') /*wechat_id*/
,COALESCE(p0.update_time,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*update_time*/
,0 /*Etl_Batch_No*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,:ETLJOB /*Etl_Job*/
,current_timestamp(0) /*Etl_Proc_Dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
,Substr('t01_scrm_contact',1,3) /*Src_Sysname*/
,'t01_scrm_contact' /*Src_Table*/
FROM p20_pdm.t01_scrm_contact p0
LEFT JOIN :SADB.t01_scrm_contact_merge p1
ON p0.scrm_leads_id =p1.merged_leads_id
where p1.scrm_leads_id is null
;
/*将不同数据插入到临时表 */
;INSERT INTO d_scrm_contact_agi_INS (
city_name /*城市*/
,company_name /*公司*/
,email /*邮箱*/
,email_data_ind /*邮箱数据标志*/
,email_availability /*邮箱有效性*/
,mobile_number /*手机号码*/
,mobile_phone_data_ind /*手机号数据标志*/
,mobile_phone_availability /*手机号数据有效性*/
,full_name /*姓名*/
,prov_name /*省份*/
,create_time /*创建时间*/
,wechat_id /*身份-企业微信外部联系人*/
,update_time /*更新时间*/
,scrm_leads_id /*SCRM线索ID*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
)
SELECT
P1.city_name /*城市*/
,P1.company_name /*公司*/
,P1.email /*邮箱*/
,P1.email_data_ind /*邮箱数据标志*/
,P1.email_availability /*邮箱有效性*/
,P1.mobile_number /*手机号码*/
,P1.mobile_phone_data_ind /*手机号数据标志*/
,P1.mobile_phone_availability /*手机号数据有效性*/
,P1.full_name /*姓名*/
,P1.prov_name /*省份*/
,P1.create_time /*创建时间*/
,P1.wechat_id /*身份-企业微信外部联系人*/
,P1.update_time /*更新时间*/
,P1.scrm_leads_id /*SCRM线索ID*/
,P1.Etl_Batch_No /*作业批次号*/
,P1.Etl_First_Dt /*最初入库时间*/
,P1.Etl_Job /*作业名称*/
,P1.Etl_Proc_Dt /*本次入库时间*/
,P1.Etl_Tx_Dt /*作业运行时间*/
,P1.Src_Sysname /*来源系统*/
,P1.Src_Table /*来源表*/
FROM d_scrm_contact_agi_CUR_I P1
LEFT JOIN :COMMDB.d_scrm_contact P2
ON P1.city_name = P2.city_name
AND P1.company_name = P2.company_name
AND P1.email = P2.email
AND P1.email_data_ind = P2.email_data_ind
AND P1.email_availability = P2.email_availability
AND P1.mobile_number = P2.mobile_number
AND P1.mobile_phone_data_ind = P2.mobile_phone_data_ind
AND P1.mobile_phone_availability = P2.mobile_phone_availability
AND P1.full_name = P2.full_name
AND P1.prov_name = P2.prov_name
AND P1.create_time = P2.create_time
AND P1.wechat_id = P2.wechat_id
AND P1.update_time = P2.update_time
AND P1.scrm_leads_id = P2.scrm_leads_id
WHERE P2.city_name IS NULL
OR P2.company_name IS NULL
OR P2.email IS NULL
OR P2.email_data_ind IS NULL
OR P2.email_availability IS NULL
OR P2.mobile_number IS NULL
OR P2.mobile_phone_data_ind IS NULL
OR P2.mobile_phone_availability IS NULL
OR P2.full_name IS NULL
OR P2.prov_name IS NULL
OR P2.create_time IS NULL
OR P2.wechat_id IS NULL
OR P2.update_time IS NULL
OR P2.scrm_leads_id IS NULL
;
/*将新增数据插入到目标表 */
;INSERT INTO :COMMDB.d_scrm_contact (
city_name /*城市*/
,company_name /*公司*/
,email /*邮箱*/
,email_data_ind /*邮箱数据标志*/
,email_availability /*邮箱有效性*/
,mobile_number /*手机号码*/
,mobile_phone_data_ind /*手机号数据标志*/
,mobile_phone_availability /*手机号数据有效性*/
,full_name /*姓名*/
,prov_name /*省份*/
,create_time /*创建时间*/
,wechat_id /*身份-企业微信外部联系人*/
,update_time /*更新时间*/
,scrm_leads_id /*SCRM线索ID*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
)
SELECT
P1.city_name /*城市*/
,P1.company_name /*公司*/
,P1.email /*邮箱*/
,P1.email_data_ind /*邮箱数据标志*/
,P1.email_availability /*邮箱有效性*/
,P1.mobile_number /*手机号码*/
,P1.mobile_phone_data_ind /*手机号数据标志*/
,P1.mobile_phone_availability /*手机号数据有效性*/
,P1.full_name /*姓名*/
,P1.prov_name /*省份*/
,P1.create_time /*创建时间*/
,P1.wechat_id /*身份-企业微信外部联系人*/
,P1.update_time /*更新时间*/
,P1.scrm_leads_id /*SCRM线索ID*/
,P1.Etl_Batch_No /*作业批次号*/
,P1.Etl_First_Dt /*最初入库时间*/
,P1.Etl_Job /*作业名称*/
,P1.Etl_Proc_Dt /*本次入库时间*/
,P1.Etl_Tx_Dt /*作业运行时间*/
,P1.Src_Sysname /*来源系统*/
,P1.Src_Table /*来源表*/
FROM d_scrm_contact_agi_INS P1
ON CONFLICT ( scrm_leads_id)
DO UPDATE SET
scrm_leads_id=excluded.scrm_leads_id
,city_name=excluded.city_name
,company_name=excluded.company_name
,email=excluded.email
,email_data_ind=excluded.email_data_ind
,email_availability=excluded.email_availability
,mobile_number=excluded.mobile_number
,mobile_phone_data_ind=excluded.mobile_phone_data_ind
,mobile_phone_availability=excluded.mobile_phone_availability
,full_name=excluded.full_name
,prov_name=excluded.prov_name
,create_time=excluded.create_time
,wechat_id=excluded.wechat_id
,update_time=excluded.update_time
,Etl_Batch_No=excluded.Etl_Batch_No
,Etl_First_Dt=excluded.Etl_First_Dt
,Etl_Job=excluded.Etl_Job
,Etl_Proc_Dt=excluded.Etl_Proc_Dt
,Etl_Tx_Dt=excluded.Etl_Tx_Dt
,Src_Sysname=excluded.Src_Sysname
,Src_Table=excluded.Src_Table
;
/*****程序结束退出 */
\q

View File

@ -152,7 +152,7 @@ dag=dag)
t01_scrm_leads = SSHOperator( t01_scrm_leads = SSHOperator(
ssh_hook=sshHook, ssh_hook=sshHook,
task_id='t01_scrm_leads', task_id='t01_scrm_leads',
command='/data/airflow/etl/PDM/run_sa.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"t01_scrm_leads_agi"}, params={'my_param':"t01_scrm_leads_agi"},
depends_on_past=False, depends_on_past=False,
retries=3, retries=3,
@ -160,7 +160,7 @@ dag=dag)
t01_scrm_contact = SSHOperator( t01_scrm_contact = SSHOperator(
ssh_hook=sshHook, ssh_hook=sshHook,
task_id='t01_scrm_contact', task_id='t01_scrm_contact',
command='/data/airflow/etl/PDM/run_sa.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"t01_scrm_contact_agi"}, params={'my_param':"t01_scrm_contact_agi"},
depends_on_past=False, depends_on_past=False,
retries=3, retries=3,
@ -168,7 +168,7 @@ dag=dag)
t01_scrm_contact_update = SSHOperator( t01_scrm_contact_update = SSHOperator(
ssh_hook=sshHook, ssh_hook=sshHook,
task_id='t01_scrm_contact_update', task_id='t01_scrm_contact_update',
command='/data/airflow/etl/PDM/run_sa.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"t01_scrm_contact_update_agi"}, params={'my_param':"t01_scrm_contact_update_agi"},
depends_on_past=False, depends_on_past=False,
retries=3, retries=3,
@ -176,11 +176,19 @@ dag=dag)
t01_scrm_contact_merge = SSHOperator( t01_scrm_contact_merge = SSHOperator(
ssh_hook=sshHook, ssh_hook=sshHook,
task_id='t01_scrm_contact_merge', task_id='t01_scrm_contact_merge',
command='/data/airflow/etl/PDM/run_sa.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"t01_scrm_contact_merge_agi"}, params={'my_param':"t01_scrm_contact_merge_agi"},
depends_on_past=False, depends_on_past=False,
retries=3, retries=3,
dag=dag) dag=dag)
d_scrm_contact = SSHOperator(
ssh_hook=sshHook,
task_id='d_scrm_contact',
command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"d_scrm_contact_agi"},
depends_on_past=False,
retries=3,
dag=dag)
customer_events_3292 >> customer_events_feign customer_events_3292 >> customer_events_feign
customer_events_open_content_page_9684 >> custom_events_open_content_page_feign customer_events_open_content_page_9684 >> custom_events_open_content_page_feign
customer_event_meta_2268 >> customer_event_meta_feign customer_event_meta_2268 >> customer_event_meta_feign
@ -191,4 +199,7 @@ custom_events_open_page_load >> t01_scrm_leads
t01_scrm_leads >> t01_scrm_contact_update t01_scrm_leads >> t01_scrm_contact_update
t01_scrm_leads >> t01_scrm_contact t01_scrm_leads >> t01_scrm_contact
t01_scrm_contact >> t01_scrm_contact_merge t01_scrm_contact >> t01_scrm_contact_merge
t01_scrm_contact_merge >> task_failed t01_scrm_contact_update >> d_scrm_contact
t01_scrm_contact >> d_scrm_contact
t01_scrm_contact_merge >> d_scrm_contact
d_scrm_contact >> task_failed