add workflow 市场易API联系人,dev

This commit is contained in:
root 2025-11-03 11:09:12 +08:00
parent 99bc8be6bd
commit 250e1b7315
2 changed files with 149 additions and 75 deletions

View File

@ -1,32 +1,34 @@
DROP TABLE IF EXISTS p60_mart.cust_enagement_records;
CREATE TABLE IF NOT EXISTS p60_mart.cust_enagement_records (
contact_id VARCHAR(20)
, full_name VARCHAR(200)
, record_type VARCHAR(30)
, enagement_record VARCHAR(500)
contact_id varchar(20)
, full_name varchar(200)
, record_type varchar(30)
, record_id varchar(50)
, enagement_record varchar(500)
, record_time timestamp(0)
, etl_batch_no VARCHAR(50)
, etl_first_dt timestamp(0)
, etl_job VARCHAR(200)
, etl_proc_dt timestamp(0)
, etl_tx_dt timestamp(0)
, src_sysname VARCHAR(50)
, src_table VARCHAR(50)
,primary key( contact_id,enagement_record,record_time )
, Etl_Batch_No varchar(50)
, Etl_First_Dt timestamp(0)
, Etl_Job varchar(200)
, Etl_Proc_Dt timestamp(0)
, Etl_Tx_Dt timestamp(0)
, Src_Sysname varchar(50)
, Src_Table varchar(50)
,primary key( contact_id,record_id,enagement_record,record_time )
);
COMMENT ON COLUMN p60_mart.cust_enagement_records.contact_id IS '客户编号';
COMMENT ON COLUMN p60_mart.cust_enagement_records.full_name IS '全名';
COMMENT ON COLUMN p60_mart.cust_enagement_records.record_type IS '记录类型';
COMMENT ON COLUMN p60_mart.cust_enagement_records.record_id IS '记录编号';
COMMENT ON COLUMN p60_mart.cust_enagement_records.enagement_record IS '接触记录';
COMMENT ON COLUMN p60_mart.cust_enagement_records.record_time IS '记录时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.etl_batch_no IS '作业批次号';
COMMENT ON COLUMN p60_mart.cust_enagement_records.etl_first_dt IS '最初入库时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.etl_job IS '作业名称';
COMMENT ON COLUMN p60_mart.cust_enagement_records.etl_proc_dt IS '本次入库时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.etl_tx_dt IS '作业运行时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.src_sysname IS '来源系统';
COMMENT ON COLUMN p60_mart.cust_enagement_records.src_table IS '来源表';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Etl_Batch_No IS '作业批次号';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Etl_First_Dt IS '最初入库时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Etl_Job IS '作业名称';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Etl_Proc_Dt IS '本次入库时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Etl_Tx_Dt IS '作业运行时间';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Src_Sysname IS '来源系统';
COMMENT ON COLUMN p60_mart.cust_enagement_records.Src_Table IS '来源表';
COMMENT ON TABLE p60_mart.cust_enagement_records IS '客户接触记录';

View File

@ -4,11 +4,11 @@
/*Brilliance stems from wisdoms. */
/*************Head Section**************************************************************************/
/*Script Use: Periodically load data to :cust_enagement_records(客户接触记录) */
/*Create Date:2024-06-17 11:18:28 */
/*Create Date:2024-06-12 10:43:33 */
/*SDM Developed By: dev */
/*SDM Developed Date: 2024-06-17 */
/*SDM Developed Date: 2024-06-04 */
/*SDM Checked By: dev */
/*SDM Checked Date: 2024-06-17 */
/*SDM Checked Date: 2024-06-12 */
/*Script Developed By: dev */
/*Script Checked By: dev */
/*Source table 1: :COMMDB.cust_leads_detail */
@ -18,7 +18,7 @@
/*Target Table:cust_enagement_records */
/*ETL Job Name:cust_enagement_records */
/*ETL Frequency:Daily */
/*ETL Policy:F1 */
/*ETL Policy:F2 */
/********************************************************************************************/
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
@ -33,6 +33,13 @@ CREATE TEMPORARY TABLE cust_enagement_records_agi_CUR_I
ON COMMIT PRESERVE ROWS;
/*创建临时表加载不同数据 */
CREATE TEMPORARY TABLE cust_enagement_records_agi_INS
( LIKE :MARTDB.cust_enagement_records)
ON COMMIT PRESERVE ROWS;
/*****************************************************************************************************/
/* GROUP 1:Source Table:cust_leads_detail*************************************************************/
/*****************************************************************************************************/
@ -41,35 +48,37 @@ INSERT INTO cust_enagement_records_agi_CUR_I (
contact_id /*客户编号*/
,full_name /*全名*/
,record_type /*记录类型*/
,record_id /*记录编号*/
,enagement_record /*接触记录*/
,record_time /*记录时间*/
,etl_batch_no /*作业批次号*/
,etl_first_dt /*最初入库时间*/
,etl_job /*作业名称*/
,etl_proc_dt /*本次入库时间*/
,etl_tx_dt /*作业运行时间*/
,src_sysname /*来源系统*/
,src_table /*来源表*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
)
SELECT
coalesce(p2.contact_id,'') /*contact_id*/
,coalesce(p2.full_name,'') /*full_name*/
,COALESCE(TRIM(CAST(p0.contact_channel AS VARCHAR(30))),'') /*record_type*/
,COALESCE(TRIM(CAST(p0.contact_channel AS varchar(30))),'') /*record_type*/
,case when p0.contact_channel = 'CRM Raw Leads' then p0."label"
else p0.contact_account
end /*enagement_record*/
end /*record_id*/
,case when p0.contact_channel = 'CRM Raw Leads' then p0.contact_channel ||'_'|| p0."label" ||'_'|| p0."event"
when p0.contact_channel = 'Chat' then p0.contact_channel ||'_'|| p0."event"
when p0.contact_channel = 'Call' then p0.contact_channel ||'_'|| p0."label" ||'_'|| p0."event"
when p0.contact_channel = 'SCRM Event' then p0.contact_channel ||'_'|| p0."label" ||'_'|| p0."event"
end /*record_time*/
,0 /*etl_batch_no*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*etl_first_dt*/
,:ETLJOB /*etl_job*/
,current_timestamp(0) /*etl_proc_dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*etl_tx_dt*/
,Substr('cust_leads_detail',1,3) /*src_sysname*/
,'cust_leads_detail' /*src_table*/
end /*enagement_record*/
,COALESCE(p0.active_time,TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*record_time*/
,0 /*Etl_Batch_No*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
,:ETLJOB /*Etl_Job*/
,current_timestamp(0) /*Etl_Proc_Dt*/
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
,Substr('cust_leads_detail',1,3) /*Src_Sysname*/
,'cust_leads_detail' /*Src_Table*/
FROM :COMMDB.cust_leads_detail p0
LEFT JOIN :COMMDB.v_cust_contact_mapping p1
@ -81,49 +90,112 @@ when p0.contact_channel = 'SCRM Event' then 'SCRM'
end
LEFT JOIN (select distinct on (contact_id) contact_id,full_name,contact_channel from p30_COMMON.cust_contact_info) p2
ON p1.contact_id=p2.contact_id
where coalesce(p2.contact_id,'')<>''
;
/*从目标表中删除所有数据 cust_enagement_records(客户接触记录) */
DELETE FROM :MARTDB.cust_enagement_records
WHERE ETL_JOB=:ETLJOB;
/*将不同数据插入到临时表 */
;INSERT INTO cust_enagement_records_agi_INS (
full_name /*全名*/
,record_type /*记录类型*/
,contact_id /*客户编号*/
,record_id /*记录编号*/
,enagement_record /*接触记录*/
,record_time /*记录时间*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
)
SELECT
P1.full_name /*全名*/
,P1.record_type /*记录类型*/
,P1.contact_id /*客户编号*/
,P1.record_id /*记录编号*/
,P1.enagement_record /*接触记录*/
,P1.record_time /*记录时间*/
,P1.Etl_Batch_No /*作业批次号*/
,P1.Etl_First_Dt /*最初入库时间*/
,P1.Etl_Job /*作业名称*/
,P1.Etl_Proc_Dt /*本次入库时间*/
,P1.Etl_Tx_Dt /*作业运行时间*/
,P1.Src_Sysname /*来源系统*/
,P1.Src_Table /*来源表*/
FROM cust_enagement_records_agi_CUR_I P1
LEFT JOIN :MARTDB.cust_enagement_records P2
ON P1.full_name = P2.full_name
AND P1.record_type = P2.record_type
AND P1.contact_id = P2.contact_id
AND P1.record_id = P2.record_id
AND P1.enagement_record = P2.enagement_record
AND P1.record_time = P2.record_time
WHERE P2.full_name IS NULL
OR P2.record_type IS NULL
OR P2.contact_id IS NULL
OR P2.record_id IS NULL
OR P2.enagement_record IS NULL
OR P2.record_time IS NULL
;
/*将新增数据插入到目标表 */
;INSERT INTO :MARTDB.cust_enagement_records (
full_name /*全名*/
,record_type /*记录类型*/
,contact_id /*客户编号*/
,enagement_record /*接触记录*/
,record_time /*记录时间*/
,etl_batch_no /*作业批次号*/
,etl_first_dt /*最初入库时间*/
,etl_job /*作业名称*/
,etl_proc_dt /*本次入库时间*/
,etl_tx_dt /*作业运行时间*/
,src_sysname /*来源系统*/
,src_table /*来源表*/
)
SELECT
P1.full_name /*全名*/
,P1.record_type /*记录类型*/
,P1.contact_id /*客户编号*/
,P1.enagement_record /*接触记录*/
,P1.record_time /*记录时间*/
,P1.etl_batch_no /*作业批次号*/
,P1.etl_first_dt /*最初入库时间*/
,P1.etl_job /*作业名称*/
,P1.etl_proc_dt /*本次入库时间*/
,P1.etl_tx_dt /*作业运行时间*/
,P1.src_sysname /*来源系统*/
,P1.src_table /*来源表*/
FROM cust_enagement_records_agi_CUR_I P1
full_name /*全名*/
,record_type /*记录类型*/
,contact_id /*客户编号*/
,record_id /*记录编号*/
,enagement_record /*接触记录*/
,record_time /*记录时间*/
,Etl_Batch_No /*作业批次号*/
,Etl_First_Dt /*最初入库时间*/
,Etl_Job /*作业名称*/
,Etl_Proc_Dt /*本次入库时间*/
,Etl_Tx_Dt /*作业运行时间*/
,Src_Sysname /*来源系统*/
,Src_Table /*来源表*/
)
SELECT
P1.full_name /*全名*/
,P1.record_type /*记录类型*/
,P1.contact_id /*客户编号*/
,P1.record_id /*记录编号*/
,P1.enagement_record /*接触记录*/
,P1.record_time /*记录时间*/
,P1.Etl_Batch_No /*作业批次号*/
,P1.Etl_First_Dt /*最初入库时间*/
,P1.Etl_Job /*作业名称*/
,P1.Etl_Proc_Dt /*本次入库时间*/
,P1.Etl_Tx_Dt /*作业运行时间*/
,P1.Src_Sysname /*来源系统*/
,P1.Src_Table /*来源表*/
FROM cust_enagement_records_agi_INS P1
ON CONFLICT ( contact_id,record_id,enagement_record,record_time)
DO UPDATE SET
contact_id=excluded.contact_id
,record_id=excluded.record_id
,enagement_record=excluded.enagement_record
,record_time=excluded.record_time
,full_name=excluded.full_name
,record_type=excluded.record_type
,Etl_Batch_No=excluded.Etl_Batch_No
,Etl_First_Dt=excluded.Etl_First_Dt
,Etl_Job=excluded.Etl_Job
,Etl_Proc_Dt=excluded.Etl_Proc_Dt
,Etl_Tx_Dt=excluded.Etl_Tx_Dt
,Src_Sysname=excluded.Src_Sysname
,Src_Table=excluded.Src_Table
;
/*****程序结束退出 */
\q