add workflow 泰克客户,dev

This commit is contained in:
root 2024-06-17 11:00:56 +08:00
parent 99ad884513
commit 303f6da0b6
5 changed files with 617 additions and 0 deletions

View File

@ -0,0 +1,34 @@
DROP TABLE IF EXISTS p60_mart.cust_enagement_records;
CREATE TABLE IF NOT EXISTS p60_mart.cust_enagement_records (
contact_id varchar(20)
, full_name varchar(200)
, record_type varchar(30)
, record_id varchar(50)
, enagement_record varchar(500)
, record_time timestamp(0)
, Etl_Batch_No varchar(50)
, Etl_First_Dt timestamp(0)
, Etl_Job varchar(200)
, Etl_Proc_Dt timestamp(0)
, Etl_Tx_Dt timestamp(0)
, Src_Sysname varchar(50)
, Src_Table varchar(50)
,primary key( contact_id,record_id,enagement_record,record_time )
);
COMMENT ON COLUMN p60_mart.cust_enagement_records.contact_id IS '客户编号';
COMMENT ON COLUMN p60_mart.cust_enagement_records.full_name IS '全名';
COMMENT ON COLUMN p60_mart.cust_enagement_records.record_type IS '记录类型';
COMMENT ON COLUMN p60_mart.cust_enagement_records.record_id IS '记录编号';
COMMENT ON COLUMN p60_mart.cust_enagement_records.enagement_record IS '接触记录';
COMMENT ON COLUMN p60_mart.cust_enagement_records.record_time IS '记录时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Etl_Batch_No IS '作业批次号';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Etl_First_Dt IS '最初入库时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Etl_Job IS '作业名称';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Etl_Proc_Dt IS '本次入库时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Etl_Tx_Dt IS '作业运行时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Src_Sysname IS '来源系统';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Src_Table IS '来源表';
COMMENT ON TABLE p60_mart.cust_enagement_records IS '客户接触记录';

View File

@ -0,0 +1,202 @@
/***************************************************************************************************/
/*script in Sql, generate by SdmCreateScript 2020(by Qihang Feng, QF255001@TERADATA.COM) */
/*VERSION 01.10 revised on 2020-08-25 */
/*Brilliance stems from wisdoms. */
/*************Head Section**************************************************************************/
/*Script Use: Periodically load data to :cust_enagement_records(客户接触记录) */
/*Create Date:2024-06-12 10:43:33 */
/*SDM Developed By: dev */
/*SDM Developed Date: 2024-06-04 */
/*SDM Checked By: dev */
/*SDM Checked Date: 2024-06-12 */
/*Script Developed By: dev */
/*Script Checked By: dev */
/*Source table 1: :COMMDB.cust_leads_detail */
/*Source table 2: :COMMDB.select distinct on (contact_id) contact_id,full_name,contact_channel from p30_COMMON.cust_contact_info*/
/*Source table 3: :COMMDB.v_cust_contact_mapping */
/*Job Type: Inbound transform (Tier 1 to Tier 2) */
/*Target Table:cust_enagement_records */
/*ETL Job Name:cust_enagement_records */
/*ETL Frequency:Daily */
/*ETL Policy:F2 */
/********************************************************************************************/
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
/*创建临时表加载当前数据 */
CREATE TEMPORARY TABLE cust_enagement_records_agi_CUR_I
( LIKE :MARTDB.cust_enagement_records)
ON COMMIT PRESERVE ROWS;
/*创建临时表加载不同数据 */
CREATE TEMPORARY TABLE cust_enagement_records_agi_INS
( LIKE :MARTDB.cust_enagement_records)
ON COMMIT PRESERVE ROWS;
/*****************************************************************************************************/
/* GROUP 1:Source Table:cust_leads_detail*************************************************************/
/*****************************************************************************************************/
INSERT INTO cust_enagement_records_agi_CUR_I (
contact_id /*客户编号*/
,full_name /*全名*/
,record_type /*记录类型*/
,record_id /*记录编号*/
,enagement_record /*接触记录*/
,record_time /*记录时间*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
)
SELECT
coalesce(p2.contact_id,'') /*contact_id*/
,coalesce(p2.full_name,'') /*full_name*/
,COALESCE(TRIM(CAST(p0.contact_channel AS varchar(30))),'') /*record_type*/
,case when p0.contact_channel = 'CRM Raw Leads' then p0."label"
else p0.contact_account
end /*record_id*/
,case when p0.contact_channel = 'CRM Raw Leads' then p0.contact_channel ||'_'|| p0."label" ||'_'|| p0."event"
when p0.contact_channel = 'Chat' then p0.contact_channel ||'_'|| p0."event"
when p0.contact_channel = 'Call' then p0.contact_channel ||'_'|| p0."label" ||'_'|| p0."event"
when p0.contact_channel = 'SCRM Event' then p0.contact_channel ||'_'|| p0."label" ||'_'|| p0."event"
end /*enagement_record*/
,COALESCE(p0.active_time,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*record_time*/
,0 /*Etl_Batch_No*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,:ETLJOB /*Etl_Job*/
,current_timestamp(0) /*Etl_Proc_Dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
,Substr('cust_leads_detail',1,3) /*Src_Sysname*/
,'cust_leads_detail' /*Src_Table*/
FROM :COMMDB.cust_leads_detail p0
LEFT JOIN :COMMDB.v_cust_contact_mapping p1
ON p0 .contact_account =p1 .contact
and p1 .contact_channel = case when p0.contact_channel = 'CRM Raw Leads' then 'CRM'
when p0.contact_channel = 'Chat' then 'Livechat'
when p0.contact_channel = 'Call' then 'Udesk'
when p0.contact_channel = 'SCRM Event' then 'SCRM'
end
LEFT JOIN (select distinct on (contact_id) contact_id,full_name,contact_channel from p30_COMMON.cust_contact_info) p2
ON p1.contact_id=p2.contact_id
where coalesce(p2.contact_id,'')<>''
;
/*将不同数据插入到临时表 */
;INSERT INTO cust_enagement_records_agi_INS (
full_name /*全名*/
,record_type /*记录类型*/
,contact_id /*客户编号*/
,record_id /*记录编号*/
,enagement_record /*接触记录*/
,record_time /*记录时间*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
)
SELECT
P1.full_name /*全名*/
,P1.record_type /*记录类型*/
,P1.contact_id /*客户编号*/
,P1.record_id /*记录编号*/
,P1.enagement_record /*接触记录*/
,P1.record_time /*记录时间*/
,P1.Etl_Batch_No /*作业批次号*/
,P1.Etl_First_Dt /*最初入库时间*/
,P1.Etl_Job /*作业名称*/
,P1.Etl_Proc_Dt /*本次入库时间*/
,P1.Etl_Tx_Dt /*作业运行时间*/
,P1.Src_Sysname /*来源系统*/
,P1.Src_Table /*来源表*/
FROM cust_enagement_records_agi_CUR_I P1
LEFT JOIN :MARTDB.cust_enagement_records P2
ON P1.full_name = P2.full_name
AND P1.record_type = P2.record_type
AND P1.contact_id = P2.contact_id
AND P1.record_id = P2.record_id
AND P1.enagement_record = P2.enagement_record
AND P1.record_time = P2.record_time
WHERE P2.full_name IS NULL
OR P2.record_type IS NULL
OR P2.contact_id IS NULL
OR P2.record_id IS NULL
OR P2.enagement_record IS NULL
OR P2.record_time IS NULL
;
/*将新增数据插入到目标表 */
;INSERT INTO :MARTDB.cust_enagement_records (
full_name /*全名*/
,record_type /*记录类型*/
,contact_id /*客户编号*/
,record_id /*记录编号*/
,enagement_record /*接触记录*/
,record_time /*记录时间*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
)
SELECT
P1.full_name /*全名*/
,P1.record_type /*记录类型*/
,P1.contact_id /*客户编号*/
,P1.record_id /*记录编号*/
,P1.enagement_record /*接触记录*/
,P1.record_time /*记录时间*/
,P1.Etl_Batch_No /*作业批次号*/
,P1.Etl_First_Dt /*最初入库时间*/
,P1.Etl_Job /*作业名称*/
,P1.Etl_Proc_Dt /*本次入库时间*/
,P1.Etl_Tx_Dt /*作业运行时间*/
,P1.Src_Sysname /*来源系统*/
,P1.Src_Table /*来源表*/
FROM cust_enagement_records_agi_INS P1
ON CONFLICT ( contact_id,record_id,enagement_record,record_time)
DO UPDATE SET
contact_id=excluded.contact_id
,record_id=excluded.record_id
,enagement_record=excluded.enagement_record
,record_time=excluded.record_time
,full_name=excluded.full_name
,record_type=excluded.record_type
,Etl_Batch_No=excluded.Etl_Batch_No
,Etl_First_Dt=excluded.Etl_First_Dt
,Etl_Job=excluded.Etl_Job
,Etl_Proc_Dt=excluded.Etl_Proc_Dt
,Etl_Tx_Dt=excluded.Etl_Tx_Dt
,Src_Sysname=excluded.Src_Sysname
,Src_Table=excluded.Src_Table
;
/*****程序结束退出 */
\q

View File

@ -0,0 +1,32 @@
DROP TABLE IF EXISTS p30_common.cust_leads_detail;
CREATE TABLE IF NOT EXISTS p30_common.cust_leads_detail (
contact_channel varchar(50)
, contact_account varchar(50)
, label varchar(50)
, event varchar(100)
, active_time timestamp(0)
, Src_Sysname varchar(50)
, Src_Table varchar(50)
, Etl_Job varchar(200)
, Etl_First_Dt timestamp(0)
, Etl_Tx_Dt timestamp(0)
, Etl_Proc_Dt timestamp(0)
, Etl_Batch_No varchar(50)
,primary key( contact_channel,contact_account,label,active_time )
);
COMMENT ON COLUMN p30_common.cust_leads_detail.contact_channel IS '线索渠道';
COMMENT ON COLUMN p30_common.cust_leads_detail.contact_account IS '线索编号';
COMMENT ON COLUMN p30_common.cust_leads_detail.label IS '标签';
COMMENT ON COLUMN p30_common.cust_leads_detail.event IS '事件';
COMMENT ON COLUMN p30_common.cust_leads_detail.active_time IS '活跃时间';
COMMENT ON COLUMN p30_common.cust_leads_detail.Src_Sysname IS '来源系统';
COMMENT ON COLUMN p30_common.cust_leads_detail.Src_Table IS '来源表';
COMMENT ON COLUMN p30_common.cust_leads_detail.Etl_Job IS '作业名称';
COMMENT ON COLUMN p30_common.cust_leads_detail.Etl_First_Dt IS '最初入库时间';
COMMENT ON COLUMN p30_common.cust_leads_detail.Etl_Tx_Dt IS '作业运行时间';
COMMENT ON COLUMN p30_common.cust_leads_detail.Etl_Proc_Dt IS '本次入库时间';
COMMENT ON COLUMN p30_common.cust_leads_detail.Etl_Batch_No IS '作业批次号';
COMMENT ON TABLE p30_common.cust_leads_detail IS '客户线索明细';

View File

@ -0,0 +1,329 @@
/***************************************************************************************************/
/*script in Sql, generate by SdmCreateScript 2020(by Qihang Feng, QF255001@TERADATA.COM) */
/*VERSION 01.10 revised on 2020-08-25 */
/*Brilliance stems from wisdoms. */
/*************Head Section**************************************************************************/
/*Script Use: Periodically load data to :cust_leads_detail(客户线索明细) */
/*Create Date:2024-06-04 11:19:40 */
/*SDM Developed By: dev */
/*SDM Developed Date: 2024-05-31 */
/*SDM Checked By: dev */
/*SDM Checked Date: 2024-06-04 */
/*Script Developed By: dev */
/*Script Checked By: dev */
/*Source table 1: :PDMDB.t01_scrm_leads_his */
/*Source table 2: :PDMDB.t01_udesk_record */
/*Source table 3: :PDMDB.t01_livechat_record */
/*Source table 4: :PDMDB.t01_scrm_leads */
/*Source table 5: :PDMDB.t01_crm_raw_leads */
/*Job Type: Inbound transform (Tier 1 to Tier 2) */
/*Target Table:cust_leads_detail */
/*ETL Job Name:cust_leads_detail */
/*ETL Frequency:Daily */
/*ETL Policy:F2 */
/********************************************************************************************/
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
/*创建临时表加载当前数据 */
CREATE TEMPORARY TABLE cust_leads_detail_agi_CUR_I
( LIKE :COMMDB.cust_leads_detail)
ON COMMIT PRESERVE ROWS;
/*创建临时表加载不同数据 */
CREATE TEMPORARY TABLE cust_leads_detail_agi_INS
( LIKE :COMMDB.cust_leads_detail)
ON COMMIT PRESERVE ROWS;
/*****************************************************************************************************/
/* GROUP 1:Source Table:t01_crm_raw_leads*************************************************************/
/*****************************************************************************************************/
INSERT INTO cust_leads_detail_agi_CUR_I (
contact_channel /*线索渠道*/
,contact_account /*线索编号*/
,label /*标签*/
,event /*事件*/
,active_time /*活跃时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
,Etl_Job /*作业名称*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Batch_No /*作业批次号*/
)
SELECT
'CRM Raw Leads' /*contact_channel*/
,COALESCE(TRIM(CAST(p0.crm_contact_account AS varchar(50))),'') /*contact_account*/
,COALESCE(TRIM(CAST(p0.lead_number AS varchar(50))),'') /*label*/
,COALESCE(TRIM(CAST(p0.market_type AS varchar(100))),'') /*event*/
,COALESCE(p0.create_time,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*active_time*/
,Substr('t01_crm_raw_leads',1,3) /*Src_Sysname*/
,'t01_crm_raw_leads' /*Src_Table*/
,:ETLJOB /*Etl_Job*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
,current_timestamp(0) /*Etl_Proc_Dt*/
,0 /*Etl_Batch_No*/
FROM :PDMDB.t01_crm_raw_leads p0
;
/*****************************************************************************************************/
/* GROUP 2:Source Table:t01_scrm_leads_his************************************************************/
/*****************************************************************************************************/
INSERT INTO cust_leads_detail_agi_CUR_I (
contact_channel /*线索渠道*/
,contact_account /*线索编号*/
,label /*标签*/
,event /*事件*/
,active_time /*活跃时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
,Etl_Job /*作业名称*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Batch_No /*作业批次号*/
)
SELECT
'SCRM Event' /*contact_channel*/
,COALESCE(TRIM(CAST(p0.cue_id AS varchar(50))),'') /*contact_account*/
,COALESCE(TRIM(CAST(p0.activate_name AS varchar(50))),'') /*label*/
,COALESCE(TRIM(CAST(p0.event_name AS varchar(100))),'') /*event*/
,COALESCE(p0.activate_time,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*active_time*/
,Substr('t01_scrm_leads_his',1,3) /*Src_Sysname*/
,'t01_scrm_leads_his' /*Src_Table*/
,:ETLJOB /*Etl_Job*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
,current_timestamp(0) /*Etl_Proc_Dt*/
,0 /*Etl_Batch_No*/
FROM :PDMDB.t01_scrm_leads_his p0
;
/*****************************************************************************************************/
/* GROUP 3:Source Table:t01_scrm_leads****************************************************************/
/*****************************************************************************************************/
INSERT INTO cust_leads_detail_agi_CUR_I (
contact_channel /*线索渠道*/
,contact_account /*线索编号*/
,label /*标签*/
,event /*事件*/
,active_time /*活跃时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
,Etl_Job /*作业名称*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Batch_No /*作业批次号*/
)
SELECT
'SCRM Event' /*contact_channel*/
,COALESCE(TRIM(CAST(p0.customer_id AS varchar(50))),'') /*contact_account*/
,COALESCE(TRIM(CAST(p0.label AS varchar(50))),'') /*label*/
,COALESCE(TRIM(CAST(p0.target_name AS varchar(100))),'') /*event*/
,COALESCE(p0.date,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*active_time*/
,Substr('t01_scrm_leads',1,3) /*Src_Sysname*/
,'t01_scrm_leads' /*Src_Table*/
,:ETLJOB /*Etl_Job*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
,current_timestamp(0) /*Etl_Proc_Dt*/
,0 /*Etl_Batch_No*/
FROM :PDMDB.t01_scrm_leads p0
;
/*****************************************************************************************************/
/* GROUP 4:Source Table:t01_udesk_record**************************************************************/
/*****************************************************************************************************/
INSERT INTO cust_leads_detail_agi_CUR_I (
contact_channel /*线索渠道*/
,contact_account /*线索编号*/
,label /*标签*/
,event /*事件*/
,active_time /*活跃时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
,Etl_Job /*作业名称*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Batch_No /*作业批次号*/
)
SELECT
'Call' /*contact_channel*/
,COALESCE(TRIM(p0.call_id),'') /*contact_account*/
,COALESCE(TRIM(CAST(p0.type AS varchar(50))),'') /*label*/
,case when p0.responsible_group = 'MKT' then '产品报价'
when p0.responsible_group = 'IAM' then '产品报价'
when p0.responsible_group = 'TSC-KEI' then '技术支持'
when p0.responsible_group = 'TSC-TEK' then '技术支持'
when p0.responsible_group = 'SSO-TEK' then '维修与校准'
else '其他' end /*event*/
,COALESCE(p0.record_time,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*active_time*/
,Substr('t01_udesk_record',1,3) /*Src_Sysname*/
,'t01_udesk_record' /*Src_Table*/
,:ETLJOB /*Etl_Job*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
,current_timestamp(0) /*Etl_Proc_Dt*/
,0 /*Etl_Batch_No*/
FROM :PDMDB.t01_udesk_record p0
;
/*****************************************************************************************************/
/* GROUP 5:Source Table:t01_livechat_record***********************************************************/
/*****************************************************************************************************/
INSERT INTO cust_leads_detail_agi_CUR_I (
contact_channel /*线索渠道*/
,contact_account /*线索编号*/
,label /*标签*/
,event /*事件*/
,active_time /*活跃时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
,Etl_Job /*作业名称*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Batch_No /*作业批次号*/
)
SELECT
'Chat' /*contact_channel*/
,COALESCE(TRIM(CAST(p0.livechat_leads_id AS varchar(50))),'') /*contact_account*/
,COALESCE(TRIM(p0.pre_chat_service),'') /*label*/
,case when p0.pre_chat_service like '%维修%' then '维修与校准'
when p0.pre_chat_service like '%技术%' then '技术支持'
when p0.pre_chat_service like '%售后%' then '售后服务'
when p0.pre_chat_service like '%报价%' then '产品报价'
else '其他' end /*event*/
,COALESCE(p0.chat_creation_time,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*active_time*/
,Substr('t01_livechat_record',1,3) /*Src_Sysname*/
,'t01_livechat_record' /*Src_Table*/
,:ETLJOB /*Etl_Job*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
,current_timestamp(0) /*Etl_Proc_Dt*/
,0 /*Etl_Batch_No*/
FROM :PDMDB.t01_livechat_record p0
;
/*将不同数据插入到临时表 */
;INSERT INTO cust_leads_detail_agi_INS (
event /*事件*/
,contact_channel /*线索渠道*/
,contact_account /*线索编号*/
,label /*标签*/
,active_time /*活跃时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
,Etl_Job /*作业名称*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Batch_No /*作业批次号*/
)
SELECT
P1.event /*事件*/
,P1.contact_channel /*线索渠道*/
,P1.contact_account /*线索编号*/
,P1.label /*标签*/
,P1.active_time /*活跃时间*/
,P1.Src_Sysname /*来源系统*/
,P1.Src_Table /*来源表*/
,P1.Etl_Job /*作业名称*/
,P1.Etl_First_Dt /*最初入库时间*/
,P1.Etl_Tx_Dt /*作业运行时间*/
,P1.Etl_Proc_Dt /*本次入库时间*/
,P1.Etl_Batch_No /*作业批次号*/
FROM cust_leads_detail_agi_CUR_I P1
LEFT JOIN :COMMDB.cust_leads_detail P2
ON P1.event = P2.event
AND P1.contact_channel = P2.contact_channel
AND P1.contact_account = P2.contact_account
AND P1.label = P2.label
AND P1.active_time = P2.active_time
WHERE P2.event IS NULL
OR P2.contact_channel IS NULL
OR P2.contact_account IS NULL
OR P2.label IS NULL
OR P2.active_time IS NULL
;
/*将新增数据插入到目标表 */
;INSERT INTO :COMMDB.cust_leads_detail (
event /*事件*/
,contact_channel /*线索渠道*/
,contact_account /*线索编号*/
,label /*标签*/
,active_time /*活跃时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
,Etl_Job /*作业名称*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Batch_No /*作业批次号*/
)
SELECT
P1.event /*事件*/
,P1.contact_channel /*线索渠道*/
,P1.contact_account /*线索编号*/
,P1.label /*标签*/
,P1.active_time /*活跃时间*/
,P1.Src_Sysname /*来源系统*/
,P1.Src_Table /*来源表*/
,P1.Etl_Job /*作业名称*/
,P1.Etl_First_Dt /*最初入库时间*/
,P1.Etl_Tx_Dt /*作业运行时间*/
,P1.Etl_Proc_Dt /*本次入库时间*/
,P1.Etl_Batch_No /*作业批次号*/
FROM cust_leads_detail_agi_INS P1
ON CONFLICT ( contact_channel,contact_account,label,active_time)
DO UPDATE SET
contact_channel=excluded.contact_channel
,contact_account=excluded.contact_account
,label=excluded.label
,active_time=excluded.active_time
,event=excluded.event
,Src_Sysname=excluded.Src_Sysname
,Src_Table=excluded.Src_Table
,Etl_Job=excluded.Etl_Job
,Etl_First_Dt=excluded.Etl_First_Dt
,Etl_Tx_Dt=excluded.Etl_Tx_Dt
,Etl_Proc_Dt=excluded.Etl_Proc_Dt
,Etl_Batch_No=excluded.Etl_Batch_No
;
/*****程序结束退出 */
\q

View File

@ -7,6 +7,7 @@ from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import json
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
@ -228,6 +229,22 @@ params={'my_param':"data_source_update_agi"},
depends_on_past=False,
retries=3,
dag=dag)
cust_leads_detail = SSHOperator(
ssh_hook=sshHook,
task_id='cust_leads_detail',
command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"cust_leads_detail_agi"},
depends_on_past=False,
retries=3,
dag=dag)
cust_enagement_records = SSHOperator(
ssh_hook=sshHook,
task_id='cust_enagement_records',
command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"cust_enagement_records_agi"},
depends_on_past=False,
retries=3,
dag=dag)
file_Tk01 >> livechat_6381
file_Udesk_record >> udesk_record_3768
file_CRM_Raw_Leads >> crm_raw_leads_6024
@ -248,4 +265,7 @@ t01_crm_raw_leads >> cust_leads
cust_leads >> cust_all_info
cust_contact_info >> cust_all_info
cust_all_info >> data_source_update
cust_all_info >> cust_leads_detail
cust_leads_detail >> cust_enagement_records
cust_enagement_records >> data_source_update
data_source_update >> task_failed