add workflow 天润Smart-ccc客户数据,dev
This commit is contained in:
parent
2c3d121b9d
commit
0b169c853b
|
@ -0,0 +1,152 @@
|
||||||
|
# coding: utf-8
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import psycopg2
|
||||||
|
import uuid
|
||||||
|
import datetime
|
||||||
|
import time
|
||||||
|
import hashlib
|
||||||
|
import time
|
||||||
|
|
||||||
|
#荟聚
|
||||||
|
|
||||||
|
#全局变量,便于参数使用的预设值
|
||||||
|
current_date = datetime.date.today() # 获取当前日期
|
||||||
|
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||||
|
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||||
|
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||||
|
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||||
|
|
||||||
|
auth_url='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||||
|
auth = {}
|
||||||
|
identityType = 'cp_employee_tools_huiju_corp'
|
||||||
|
sel_sql = "select * from p61_output.ccc_scrm_cust_label_info where sync_ind <> '2';"
|
||||||
|
upt_sql = "update p61_output.ccc_scrm_cust_label_info set sync_ind=%s, etl_proc_dt=current_timestamp(0) where cust_id = %s and scrm_label_id = %s"
|
||||||
|
ist_sql = "INSERT INTO p61_output.scrm_leads_external_id_mapping (leads_id, external_type, external_id, etl_proc_dt, etl_tx_dt) VALUES (%s, %s, %s,current_timestamp(0), current_date);"
|
||||||
|
|
||||||
|
database="daas_mpp"
|
||||||
|
user="dbuser_mpp"
|
||||||
|
password="qwF0GX8"
|
||||||
|
host="172.17.0.8"
|
||||||
|
port="5432"
|
||||||
|
|
||||||
|
def formatted2_previous_hour(h):
|
||||||
|
if h==0:
|
||||||
|
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
|
||||||
|
# 减去一个小时,得到前一个小时的开始时间
|
||||||
|
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
|
||||||
|
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
def get_token(url):
|
||||||
|
#请求鉴权接口
|
||||||
|
authRequest=requests.get(url)
|
||||||
|
#解析结果
|
||||||
|
if not authRequest: #若为空时,返回空
|
||||||
|
return get_token(url)
|
||||||
|
auth=json.loads(authRequest.text)
|
||||||
|
while 'error' in auth and i < 60:
|
||||||
|
time.sleep(60)
|
||||||
|
auth = get_token(url)
|
||||||
|
i = i + 1
|
||||||
|
return auth
|
||||||
|
|
||||||
|
def get_data_from_db(cur, conn, sql):
|
||||||
|
# conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
|
||||||
|
# cur=conn.cursor()
|
||||||
|
cur.execute(sql)
|
||||||
|
records = cur.fetchall()
|
||||||
|
conn.commit()
|
||||||
|
# cur.close()
|
||||||
|
# conn.close()
|
||||||
|
return records
|
||||||
|
|
||||||
|
def update_db_flag(cur, conn, sql, status, cust_id, tag_id):
|
||||||
|
# conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
|
||||||
|
# cur=conn.cursor()
|
||||||
|
cur.execute(sql,[status,cust_id,tag_id])
|
||||||
|
conn.commit()
|
||||||
|
# cur.close()
|
||||||
|
# conn.close()
|
||||||
|
|
||||||
|
def add_leads_id(cur, conn, sql, type, leads_id, extern_id):
|
||||||
|
try:
|
||||||
|
# conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
|
||||||
|
# cur=conn.cursor()
|
||||||
|
cur.execute(sql,[leads_id,type,extern_id])
|
||||||
|
conn.commit()
|
||||||
|
# cur.close()
|
||||||
|
# conn.close()
|
||||||
|
except Exception as e:
|
||||||
|
print(f'添加线索id异常,接口返回信息:{e}')
|
||||||
|
|
||||||
|
def request_custom_id(id):
|
||||||
|
print(f'开始请求线索id:{id}')
|
||||||
|
header={}
|
||||||
|
url='https://api.huiju.cool/v2/customerService/findCustomerByIdentity'
|
||||||
|
body={'access_token':auth['access_token'],'identityType':identityType,'identityValue':id}
|
||||||
|
dataReqL=requests.get(url,headers=header,params=body)
|
||||||
|
resL=json.loads(dataReqL.text)
|
||||||
|
return resL
|
||||||
|
|
||||||
|
def request_update_tags(idStr, tagId ):
|
||||||
|
print(f'开始请求更新线索标签{idStr}|{tagId}')
|
||||||
|
header={}
|
||||||
|
access_token = auth['access_token']
|
||||||
|
url=f'https://api.huiju.cool/v2/customerTags/bulkAdd?access_token={access_token}'
|
||||||
|
body=[{'tagId':tagId, 'customerIds':[idStr]}]
|
||||||
|
jsonData = json.dumps(body)
|
||||||
|
print(f'body: {jsonData}')
|
||||||
|
dataReqL=requests.post(url,headers=header,data=jsonData)
|
||||||
|
resL=json.loads(dataReqL.text)
|
||||||
|
return resL
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print(f'{formatted2_previous_hour(0)}开始更新标签信息')
|
||||||
|
auth = get_token(auth_url)
|
||||||
|
records = []
|
||||||
|
conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
|
||||||
|
cur=conn.cursor()
|
||||||
|
records = get_data_from_db(cur, conn, sel_sql)
|
||||||
|
if len(records) > 0:
|
||||||
|
print(f'此处需处理{len(records)}条标签信息')
|
||||||
|
for data in records:
|
||||||
|
custId = data[0]
|
||||||
|
id = data[1]
|
||||||
|
idStr = data[2]
|
||||||
|
tagId = data[5]
|
||||||
|
print(f'开始处理客户({id}:{tagId})标签信息')
|
||||||
|
try:
|
||||||
|
f = False
|
||||||
|
s = '2'
|
||||||
|
if not idStr:
|
||||||
|
resL = request_custom_id(id)
|
||||||
|
if 'error' in resL and resL['error']['code'] == 400010:
|
||||||
|
get_token(auth_url)
|
||||||
|
resL = request_custom_id(id)
|
||||||
|
if 'error' in resL:
|
||||||
|
print(f'获取客户线索id异常,接口返回信息:{resL}')
|
||||||
|
continue
|
||||||
|
idStr = resL['idStr']
|
||||||
|
if idStr == '':
|
||||||
|
print(f'获取客户线索id为空,接口返回信息:{resL}')
|
||||||
|
continue
|
||||||
|
f=True
|
||||||
|
if f:
|
||||||
|
add_leads_id(cur, conn, ist_sql,identityType, idStr, id)
|
||||||
|
resU = request_update_tags(idStr,tagId)
|
||||||
|
if 'error' in resU and resU['error']['code'] == 400010:
|
||||||
|
get_token(auth_url)
|
||||||
|
resU = request_update_tags(idStr,tagId)
|
||||||
|
if 'error' in resU:
|
||||||
|
print(f'获取客户线索id异常,接口返回信息:{resU}')
|
||||||
|
continue
|
||||||
|
update_db_flag(cur, conn, upt_sql,s,custId,tagId)
|
||||||
|
except Exception as e:
|
||||||
|
print(f'处理客户({id}:{tagId})标签信息,异常信息:{e}')
|
||||||
|
else:
|
||||||
|
print(f'记录数为0,此处无需处理标签信息')
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
print(f'{formatted2_previous_hour(0)}更新标签信息结束')
|
|
@ -0,0 +1,34 @@
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS p20_pdm.t01_scrm_label;
|
||||||
|
CREATE TABLE IF NOT EXISTS p20_pdm.t01_scrm_label (
|
||||||
|
scrm_label_id varchar(50)
|
||||||
|
, scrm_label_name varchar(50)
|
||||||
|
, scrm_label_group varchar(10)
|
||||||
|
, scrm_label_type varchar(20)
|
||||||
|
, created_tm timestamp(0)
|
||||||
|
, updated_tm timestamp(0)
|
||||||
|
, Etl_Batch_No varchar(50)
|
||||||
|
, Etl_First_Dt timestamp(0)
|
||||||
|
, Etl_Job varchar(200)
|
||||||
|
, Etl_Proc_Dt timestamp(0)
|
||||||
|
, Etl_Tx_Dt timestamp(0)
|
||||||
|
, Src_Sysname varchar(50)
|
||||||
|
, Src_Table varchar(50)
|
||||||
|
,primary key( scrm_label_id )
|
||||||
|
);
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.scrm_label_id IS '标签对应Id';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.scrm_label_name IS '标签名';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.scrm_label_group IS '标签分组名';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.scrm_label_type IS '标签类型';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.created_tm IS '创建时间';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.updated_tm IS '上次更新时间';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.Etl_Batch_No IS '作业批次号';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.Etl_First_Dt IS '最初入库时间';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.Etl_Job IS '作业名称';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.Etl_Proc_Dt IS '本次入库时间';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.Etl_Tx_Dt IS '作业运行时间';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.Src_Sysname IS '来源系统';
|
||||||
|
COMMENT ON COLUMN p20_pdm.t01_scrm_label.Src_Table IS '来源表';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p20_pdm.t01_scrm_label IS 'SCRM标签信息';
|
|
@ -0,0 +1,181 @@
|
||||||
|
/***************************************************************************************************/
|
||||||
|
/*script in Sql, generate by SdmCreateScript 2020(by Qihang Feng, QF255001@TERADATA.COM) */
|
||||||
|
/*VERSION 01.10 revised on 2020-08-25 */
|
||||||
|
/*Brilliance stems from wisdoms. */
|
||||||
|
/*************Head Section**************************************************************************/
|
||||||
|
/*Script Use: Periodically load data to :t01_scrm_label(SCRM标签信息) */
|
||||||
|
/*Create Date:2024-07-11 18:22:36 */
|
||||||
|
/*SDM Developed By: dev */
|
||||||
|
/*SDM Developed Date: 2024-07-11 */
|
||||||
|
/*SDM Checked By: dev */
|
||||||
|
/*SDM Checked Date: 2024-07-11 */
|
||||||
|
/*Script Developed By: dev */
|
||||||
|
/*Script Checked By: dev */
|
||||||
|
/*Source table 1: p10_sa.s98_s_scrm_tags */
|
||||||
|
/*Job Type: Inbound transform (Tier 1 to Tier 2) */
|
||||||
|
/*Target Table:t01_scrm_label */
|
||||||
|
/*ETL Job Name:t01_scrm_label */
|
||||||
|
/*ETL Frequency:Daily */
|
||||||
|
/*ETL Policy:F2 */
|
||||||
|
/********************************************************************************************/
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*创建临时表加载当前数据 */
|
||||||
|
CREATE TEMPORARY TABLE t01_scrm_label_agi_CUR_I
|
||||||
|
( LIKE :PDMDB.t01_scrm_label)
|
||||||
|
ON COMMIT PRESERVE ROWS;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*创建临时表加载不同数据 */
|
||||||
|
CREATE TEMPORARY TABLE t01_scrm_label_agi_INS
|
||||||
|
( LIKE :PDMDB.t01_scrm_label)
|
||||||
|
ON COMMIT PRESERVE ROWS;
|
||||||
|
|
||||||
|
|
||||||
|
/*****************************************************************************************************/
|
||||||
|
/* GROUP 1:Source Table:s98_s_scrm_tags***************************************************************/
|
||||||
|
/*****************************************************************************************************/
|
||||||
|
INSERT INTO t01_scrm_label_agi_CUR_I (
|
||||||
|
scrm_label_id /*标签对应Id*/
|
||||||
|
,scrm_label_name /*标签名*/
|
||||||
|
,scrm_label_group /*标签分组名*/
|
||||||
|
,scrm_label_type /*标签类型*/
|
||||||
|
,created_tm /*创建时间*/
|
||||||
|
,updated_tm /*上次更新时间*/
|
||||||
|
,Etl_Batch_No /*作业批次号*/
|
||||||
|
,Etl_First_Dt /*最初入库时间*/
|
||||||
|
,Etl_Job /*作业名称*/
|
||||||
|
,Etl_Proc_Dt /*本次入库时间*/
|
||||||
|
,Etl_Tx_Dt /*作业运行时间*/
|
||||||
|
,Src_Sysname /*来源系统*/
|
||||||
|
,Src_Table /*来源表*/
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
COALESCE(TRIM(CAST(p0.id AS varchar(50))),'') /*scrm_label_id*/
|
||||||
|
,COALESCE(TRIM(CAST(p0.name AS varchar(50))),'') /*scrm_label_name*/
|
||||||
|
,COALESCE(TRIM(CAST(p0.group_name AS varchar(10))),'') /*scrm_label_group*/
|
||||||
|
,COALESCE(TRIM(CAST(p0.type AS varchar(20))),'') /*scrm_label_type*/
|
||||||
|
,COALESCE(TO_TIMESTAMP(CAST(p0.date_created AS VARCHAR(20)),'YYYY-MM-DDTHH24:mi:ssZ'),TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*created_tm*/
|
||||||
|
,COALESCE(TO_TIMESTAMP(CAST(p0 .last_updated AS VARCHAR(20)),'YYYY-MM-DDTHH24:mi:ssZ'),TO_TIMESTAMP(:NULLDATE,'YYYYMMDD')) /*updated_tm*/
|
||||||
|
,0 /*Etl_Batch_No*/
|
||||||
|
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
|
||||||
|
,:ETLJOB /*Etl_Job*/
|
||||||
|
,current_timestamp(0) /*Etl_Proc_Dt*/
|
||||||
|
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
|
||||||
|
,Substr('s98_s_scrm_tags',1,3) /*Src_Sysname*/
|
||||||
|
,'s98_s_scrm_tags' /*Src_Table*/
|
||||||
|
|
||||||
|
FROM p10_sa.s98_s_scrm_tags p0
|
||||||
|
;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*将不同数据插入到临时表 */
|
||||||
|
;INSERT INTO t01_scrm_label_agi_INS (
|
||||||
|
scrm_label_name /*标签名*/
|
||||||
|
,scrm_label_group /*标签分组名*/
|
||||||
|
,scrm_label_type /*标签类型*/
|
||||||
|
,created_tm /*创建时间*/
|
||||||
|
,updated_tm /*上次更新时间*/
|
||||||
|
,scrm_label_id /*标签对应Id*/
|
||||||
|
,Etl_Batch_No /*作业批次号*/
|
||||||
|
,Etl_First_Dt /*最初入库时间*/
|
||||||
|
,Etl_Job /*作业名称*/
|
||||||
|
,Etl_Proc_Dt /*本次入库时间*/
|
||||||
|
,Etl_Tx_Dt /*作业运行时间*/
|
||||||
|
,Src_Sysname /*来源系统*/
|
||||||
|
,Src_Table /*来源表*/
|
||||||
|
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
P1.scrm_label_name /*标签名*/
|
||||||
|
,P1.scrm_label_group /*标签分组名*/
|
||||||
|
,P1.scrm_label_type /*标签类型*/
|
||||||
|
,P1.created_tm /*创建时间*/
|
||||||
|
,P1.updated_tm /*上次更新时间*/
|
||||||
|
,P1.scrm_label_id /*标签对应Id*/
|
||||||
|
,P1.Etl_Batch_No /*作业批次号*/
|
||||||
|
,P1.Etl_First_Dt /*最初入库时间*/
|
||||||
|
,P1.Etl_Job /*作业名称*/
|
||||||
|
,P1.Etl_Proc_Dt /*本次入库时间*/
|
||||||
|
,P1.Etl_Tx_Dt /*作业运行时间*/
|
||||||
|
,P1.Src_Sysname /*来源系统*/
|
||||||
|
,P1.Src_Table /*来源表*/
|
||||||
|
|
||||||
|
FROM t01_scrm_label_agi_CUR_I P1
|
||||||
|
LEFT JOIN :PDMDB.t01_scrm_label P2
|
||||||
|
ON P1.scrm_label_name = P2.scrm_label_name
|
||||||
|
AND P1.scrm_label_group = P2.scrm_label_group
|
||||||
|
AND P1.scrm_label_type = P2.scrm_label_type
|
||||||
|
AND P1.created_tm = P2.created_tm
|
||||||
|
AND P1.updated_tm = P2.updated_tm
|
||||||
|
AND P1.scrm_label_id = P2.scrm_label_id
|
||||||
|
|
||||||
|
WHERE P2.scrm_label_name IS NULL
|
||||||
|
OR P2.scrm_label_group IS NULL
|
||||||
|
OR P2.scrm_label_type IS NULL
|
||||||
|
OR P2.created_tm IS NULL
|
||||||
|
OR P2.updated_tm IS NULL
|
||||||
|
OR P2.scrm_label_id IS NULL
|
||||||
|
|
||||||
|
;
|
||||||
|
/*将新增数据插入到目标表 */
|
||||||
|
;INSERT INTO :PDMDB.t01_scrm_label (
|
||||||
|
scrm_label_name /*标签名*/
|
||||||
|
,scrm_label_group /*标签分组名*/
|
||||||
|
,scrm_label_type /*标签类型*/
|
||||||
|
,created_tm /*创建时间*/
|
||||||
|
,updated_tm /*上次更新时间*/
|
||||||
|
,scrm_label_id /*标签对应Id*/
|
||||||
|
,Etl_Batch_No /*作业批次号*/
|
||||||
|
,Etl_First_Dt /*最初入库时间*/
|
||||||
|
,Etl_Job /*作业名称*/
|
||||||
|
,Etl_Proc_Dt /*本次入库时间*/
|
||||||
|
,Etl_Tx_Dt /*作业运行时间*/
|
||||||
|
,Src_Sysname /*来源系统*/
|
||||||
|
,Src_Table /*来源表*/
|
||||||
|
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
P1.scrm_label_name /*标签名*/
|
||||||
|
,P1.scrm_label_group /*标签分组名*/
|
||||||
|
,P1.scrm_label_type /*标签类型*/
|
||||||
|
,P1.created_tm /*创建时间*/
|
||||||
|
,P1.updated_tm /*上次更新时间*/
|
||||||
|
,P1.scrm_label_id /*标签对应Id*/
|
||||||
|
,P1.Etl_Batch_No /*作业批次号*/
|
||||||
|
,P1.Etl_First_Dt /*最初入库时间*/
|
||||||
|
,P1.Etl_Job /*作业名称*/
|
||||||
|
,P1.Etl_Proc_Dt /*本次入库时间*/
|
||||||
|
,P1.Etl_Tx_Dt /*作业运行时间*/
|
||||||
|
,P1.Src_Sysname /*来源系统*/
|
||||||
|
,P1.Src_Table /*来源表*/
|
||||||
|
|
||||||
|
FROM t01_scrm_label_agi_INS P1
|
||||||
|
ON CONFLICT ( scrm_label_id)
|
||||||
|
DO UPDATE SET
|
||||||
|
scrm_label_id=excluded.scrm_label_id
|
||||||
|
,scrm_label_name=excluded.scrm_label_name
|
||||||
|
,scrm_label_group=excluded.scrm_label_group
|
||||||
|
,scrm_label_type=excluded.scrm_label_type
|
||||||
|
,created_tm=excluded.created_tm
|
||||||
|
,updated_tm=excluded.updated_tm
|
||||||
|
,Etl_Batch_No=excluded.Etl_Batch_No
|
||||||
|
,Etl_First_Dt=excluded.Etl_First_Dt
|
||||||
|
,Etl_Job=excluded.Etl_Job
|
||||||
|
,Etl_Proc_Dt=excluded.Etl_Proc_Dt
|
||||||
|
,Etl_Tx_Dt=excluded.Etl_Tx_Dt
|
||||||
|
,Src_Sysname=excluded.Src_Sysname
|
||||||
|
,Src_Table=excluded.Src_Table
|
||||||
|
|
||||||
|
|
||||||
|
;
|
||||||
|
/*****程序结束退出 */
|
||||||
|
\q
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
delete from p10_sa.S98_S_scrm_tags
|
||||||
|
;
|
||||||
|
insert into p10_sa.S98_S_scrm_tags
|
||||||
|
( id
|
||||||
|
, name
|
||||||
|
, group_name
|
||||||
|
, type
|
||||||
|
, date_created
|
||||||
|
, last_updated
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
id
|
||||||
|
, name
|
||||||
|
, group_name
|
||||||
|
, type
|
||||||
|
, date_created
|
||||||
|
, last_updated
|
||||||
|
, etl_tx_dt
|
||||||
|
from p00_tal.S98_S_scrm_tags
|
||||||
|
;
|
||||||
|
delete from p12_sfull.S98_S_scrm_tags
|
||||||
|
;
|
||||||
|
;
|
||||||
|
insert into p12_sfull.S98_S_scrm_tags
|
||||||
|
( id
|
||||||
|
, name
|
||||||
|
, group_name
|
||||||
|
, type
|
||||||
|
, date_created
|
||||||
|
, last_updated
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
id
|
||||||
|
, name
|
||||||
|
, group_name
|
||||||
|
, type
|
||||||
|
, date_created
|
||||||
|
, last_updated
|
||||||
|
, etl_tx_dt
|
||||||
|
from p10_sa.S98_S_scrm_tags
|
||||||
|
;
|
||||||
|
\q
|
|
@ -0,0 +1,18 @@
|
||||||
|
|
||||||
|
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_scrm_tags (
|
||||||
|
id TEXT
|
||||||
|
, name TEXT
|
||||||
|
, group_name TEXT
|
||||||
|
, type TEXT
|
||||||
|
, date_created TEXT
|
||||||
|
, last_updated TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'scrm_tags' );
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
|
||||||
|
create table if not exists p10_sa.S98_S_scrm_tags (
|
||||||
|
id TEXT
|
||||||
|
, name TEXT
|
||||||
|
, group_name TEXT
|
||||||
|
, type TEXT
|
||||||
|
, date_created TEXT
|
||||||
|
, last_updated TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.id IS '标签对应Id';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.name IS '标签名';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.group_name IS '标签分组名';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.type IS '标签类型';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.date_created IS '创建时间';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.last_updated IS '上次更新时间';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p10_sa.S98_S_scrm_tags IS '';
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
create table if not exists p12_sfull.S98_S_scrm_tags (
|
||||||
|
id TEXT
|
||||||
|
, name TEXT
|
||||||
|
, group_name TEXT
|
||||||
|
, type TEXT
|
||||||
|
, date_created TEXT
|
||||||
|
, last_updated TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.id IS '标签对应Id';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.name IS '标签名';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.group_name IS '标签分组名';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.type IS '标签类型';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.date_created IS '创建时间';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.last_updated IS '上次更新时间';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p12_sfull.S98_S_scrm_tags IS '';
|
||||||
|
|
|
@ -122,10 +122,70 @@ dag=dag)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
huiju_tags_feign = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='huiju_tags_feign',
|
||||||
|
command='python3 /data/airflow/etl/API/huiju_tags_feign.py',
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
huiju_tags_load = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='huiju_tags_load',
|
||||||
|
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||||
|
params={'my_param':"huiju_tags_load"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
huiju_tags_feign >> huiju_tags_load
|
||||||
|
|
||||||
|
scrm_tags_5000 = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='scrm_tags_5000',
|
||||||
|
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
|
||||||
|
params={'my_param':"S98_S_scrm_tags"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
|
||||||
|
uds_crm_tag_update = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='uds_crm_tag_update',
|
||||||
|
command='python /data/airflow/etl/Python/crm_tag_update.py',
|
||||||
|
params={'my_param':"uds_crm_tag_update"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
t01_scrm_label = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='t01_scrm_label',
|
||||||
|
command='/data/airflow/etl/PDM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
|
||||||
|
params={'my_param':"t01_scrm_label_agi"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
cust_label_rela = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='cust_label_rela',
|
||||||
|
command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
|
||||||
|
params={'my_param':"cust_label_rela_agi"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
customer_labels_load >> tr_custom_labels_8280
|
customer_labels_load >> tr_custom_labels_8280
|
||||||
customer_list_load >> tr_custom_details_5516
|
customer_list_load >> tr_custom_details_5516
|
||||||
tr_custom_labels_8280 >> t01_ccc_cust_label
|
tr_custom_labels_8280 >> t01_ccc_cust_label
|
||||||
tr_custom_details_5516 >> t01_ccc_cust_info
|
tr_custom_details_5516 >> t01_ccc_cust_info
|
||||||
t01_ccc_cust_label >> dysql_ccc_scrm_cust_label_info
|
t01_ccc_cust_label >> dysql_ccc_scrm_cust_label_info
|
||||||
t01_ccc_cust_info >> dysql_ccc_scrm_cust_label_info
|
t01_ccc_cust_info >> dysql_ccc_scrm_cust_label_info
|
||||||
dysql_ccc_scrm_cust_label_info >> task_failed
|
huiju_tags_load >> scrm_tags_5000
|
||||||
|
scrm_tags_5000 >> uds_crm_tag_update
|
||||||
|
dysql_ccc_scrm_cust_label_info >> uds_crm_tag_update
|
||||||
|
t01_scrm_label >> cust_label_rela
|
||||||
|
scrm_tags_5000 >> t01_scrm_label
|
||||||
|
cust_label_rela >> uds_crm_tag_update
|
||||||
|
uds_crm_tag_update >> task_failed
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS p30_common.cust_label_rela;
|
||||||
|
CREATE TABLE IF NOT EXISTS p30_common.cust_label_rela (
|
||||||
|
cust_id varchar(10)
|
||||||
|
, cust_label_id varchar(10)
|
||||||
|
, cust_label_name varchar(50)
|
||||||
|
, cust_external_id varchar(50)
|
||||||
|
, Etl_Batch_No varchar(50)
|
||||||
|
, Etl_First_Dt timestamp(0)
|
||||||
|
, Etl_Job varchar(200)
|
||||||
|
, Etl_Proc_Dt timestamp(0)
|
||||||
|
, Etl_Tx_Dt timestamp(0)
|
||||||
|
, Src_Sysname varchar(50)
|
||||||
|
, Src_Table varchar(50)
|
||||||
|
,primary key( cust_id,cust_label_id )
|
||||||
|
);
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.cust_id IS '客户编号';
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.cust_label_id IS '客户标签编号';
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.cust_label_name IS '客户标签名称';
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.cust_external_id IS '客户外部企业ID';
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.Etl_Batch_No IS '作业批次号';
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.Etl_First_Dt IS '最初入库时间';
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.Etl_Job IS '作业名称';
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.Etl_Proc_Dt IS '本次入库时间';
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.Etl_Tx_Dt IS '作业运行时间';
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.Src_Sysname IS '来源系统';
|
||||||
|
COMMENT ON COLUMN p30_common.cust_label_rela.Src_Table IS '来源表';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p30_common.cust_label_rela IS '客户标签关系表';
|
|
@ -0,0 +1,170 @@
|
||||||
|
/***************************************************************************************************/
|
||||||
|
/*script in Sql, generate by SdmCreateScript 2020(by Qihang Feng, QF255001@TERADATA.COM) */
|
||||||
|
/*VERSION 01.10 revised on 2020-08-25 */
|
||||||
|
/*Brilliance stems from wisdoms. */
|
||||||
|
/*************Head Section**************************************************************************/
|
||||||
|
/*Script Use: Periodically load data to :cust_label_rela(客户标签关系表) */
|
||||||
|
/*Create Date:2024-08-01 10:13:50 */
|
||||||
|
/*SDM Developed By: dev */
|
||||||
|
/*SDM Developed Date: 2024-07-11 */
|
||||||
|
/*SDM Checked By: dev */
|
||||||
|
/*SDM Checked Date: 2024-08-01 */
|
||||||
|
/*Script Developed By: dev */
|
||||||
|
/*Script Checked By: dev */
|
||||||
|
/*Source table 1: :PDMDB.t01_ccc_cust_info */
|
||||||
|
/*Source table 2: :PDMDB.t01_ccc_cust_label */
|
||||||
|
/*Job Type: Inbound transform (Tier 1 to Tier 2) */
|
||||||
|
/*Target Table:cust_label_rela */
|
||||||
|
/*ETL Job Name:cust_label_rela */
|
||||||
|
/*ETL Frequency:Daily */
|
||||||
|
/*ETL Policy:F2 */
|
||||||
|
/********************************************************************************************/
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*创建临时表加载当前数据 */
|
||||||
|
CREATE TEMPORARY TABLE cust_label_rela_agi_CUR_I
|
||||||
|
( LIKE :COMMDB.cust_label_rela)
|
||||||
|
ON COMMIT PRESERVE ROWS;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*创建临时表加载不同数据 */
|
||||||
|
CREATE TEMPORARY TABLE cust_label_rela_agi_INS
|
||||||
|
( LIKE :COMMDB.cust_label_rela)
|
||||||
|
ON COMMIT PRESERVE ROWS;
|
||||||
|
|
||||||
|
|
||||||
|
/*****************************************************************************************************/
|
||||||
|
/* GROUP 1:Source Table:t01_ccc_cust_info*************************************************************/
|
||||||
|
/*****************************************************************************************************/
|
||||||
|
|
||||||
|
INSERT INTO cust_label_rela_agi_CUR_I (
|
||||||
|
cust_id /*客户编号*/
|
||||||
|
,cust_label_id /*客户标签编号*/
|
||||||
|
,cust_label_name /*客户标签名称*/
|
||||||
|
,cust_external_id /*客户外部企业ID*/
|
||||||
|
,Etl_Batch_No /*作业批次号*/
|
||||||
|
,Etl_First_Dt /*最初入库时间*/
|
||||||
|
,Etl_Job /*作业名称*/
|
||||||
|
,Etl_Proc_Dt /*本次入库时间*/
|
||||||
|
,Etl_Tx_Dt /*作业运行时间*/
|
||||||
|
,Src_Sysname /*来源系统*/
|
||||||
|
,Src_Table /*来源表*/
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
coalesce(p0.id,'') /*cust_id*/
|
||||||
|
,coalesce(p0.label_id,'') /*cust_label_id*/
|
||||||
|
,coalesce(p1.label_name,'') /*cust_label_name*/
|
||||||
|
,coalesce(p0.external_id,'') /*cust_external_id*/
|
||||||
|
,0 /*Etl_Batch_No*/
|
||||||
|
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_First_Dt*/
|
||||||
|
,:ETLJOB /*Etl_Job*/
|
||||||
|
,current_timestamp(0) /*Etl_Proc_Dt*/
|
||||||
|
,TO_DATE(:TXDATE,'YYYYMMDD') /*Etl_Tx_Dt*/
|
||||||
|
,Substr('t01_ccc_cust_info',1,3) /*Src_Sysname*/
|
||||||
|
,'t01_ccc_cust_info' /*Src_Table*/
|
||||||
|
|
||||||
|
FROM (select id,unnest(string_to_array(replace(replace(label_ids,'[',''),']',''),',')) label_id,external_id from p20_pdm.t01_ccc_cust_info tcci
|
||||||
|
where length(trim(external_id))>0) p0
|
||||||
|
LEFT JOIN :PDMDB.t01_ccc_cust_label p1
|
||||||
|
ON p0.label_id=p1.label_id
|
||||||
|
|
||||||
|
|
||||||
|
;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*将不同数据插入到临时表 */
|
||||||
|
;INSERT INTO cust_label_rela_agi_INS (
|
||||||
|
cust_label_name /*客户标签名称*/
|
||||||
|
,cust_external_id /*客户外部企业ID*/
|
||||||
|
,cust_id /*客户编号*/
|
||||||
|
,cust_label_id /*客户标签编号*/
|
||||||
|
,Etl_Batch_No /*作业批次号*/
|
||||||
|
,Etl_First_Dt /*最初入库时间*/
|
||||||
|
,Etl_Job /*作业名称*/
|
||||||
|
,Etl_Proc_Dt /*本次入库时间*/
|
||||||
|
,Etl_Tx_Dt /*作业运行时间*/
|
||||||
|
,Src_Sysname /*来源系统*/
|
||||||
|
,Src_Table /*来源表*/
|
||||||
|
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
P1.cust_label_name /*客户标签名称*/
|
||||||
|
,P1.cust_external_id /*客户外部企业ID*/
|
||||||
|
,P1.cust_id /*客户编号*/
|
||||||
|
,P1.cust_label_id /*客户标签编号*/
|
||||||
|
,P1.Etl_Batch_No /*作业批次号*/
|
||||||
|
,P1.Etl_First_Dt /*最初入库时间*/
|
||||||
|
,P1.Etl_Job /*作业名称*/
|
||||||
|
,P1.Etl_Proc_Dt /*本次入库时间*/
|
||||||
|
,P1.Etl_Tx_Dt /*作业运行时间*/
|
||||||
|
,P1.Src_Sysname /*来源系统*/
|
||||||
|
,P1.Src_Table /*来源表*/
|
||||||
|
|
||||||
|
FROM cust_label_rela_agi_CUR_I P1
|
||||||
|
LEFT JOIN :COMMDB.cust_label_rela P2
|
||||||
|
ON P1.cust_label_name = P2.cust_label_name
|
||||||
|
AND P1.cust_external_id = P2.cust_external_id
|
||||||
|
AND P1.cust_id = P2.cust_id
|
||||||
|
AND P1.cust_label_id = P2.cust_label_id
|
||||||
|
|
||||||
|
WHERE P2.cust_label_name IS NULL
|
||||||
|
OR P2.cust_external_id IS NULL
|
||||||
|
OR P2.cust_id IS NULL
|
||||||
|
OR P2.cust_label_id IS NULL
|
||||||
|
|
||||||
|
;
|
||||||
|
/*将新增数据插入到目标表 */
|
||||||
|
;INSERT INTO :COMMDB.cust_label_rela (
|
||||||
|
cust_label_name /*客户标签名称*/
|
||||||
|
,cust_external_id /*客户外部企业ID*/
|
||||||
|
,cust_id /*客户编号*/
|
||||||
|
,cust_label_id /*客户标签编号*/
|
||||||
|
,Etl_Batch_No /*作业批次号*/
|
||||||
|
,Etl_First_Dt /*最初入库时间*/
|
||||||
|
,Etl_Job /*作业名称*/
|
||||||
|
,Etl_Proc_Dt /*本次入库时间*/
|
||||||
|
,Etl_Tx_Dt /*作业运行时间*/
|
||||||
|
,Src_Sysname /*来源系统*/
|
||||||
|
,Src_Table /*来源表*/
|
||||||
|
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
P1.cust_label_name /*客户标签名称*/
|
||||||
|
,P1.cust_external_id /*客户外部企业ID*/
|
||||||
|
,P1.cust_id /*客户编号*/
|
||||||
|
,P1.cust_label_id /*客户标签编号*/
|
||||||
|
,P1.Etl_Batch_No /*作业批次号*/
|
||||||
|
,P1.Etl_First_Dt /*最初入库时间*/
|
||||||
|
,P1.Etl_Job /*作业名称*/
|
||||||
|
,P1.Etl_Proc_Dt /*本次入库时间*/
|
||||||
|
,P1.Etl_Tx_Dt /*作业运行时间*/
|
||||||
|
,P1.Src_Sysname /*来源系统*/
|
||||||
|
,P1.Src_Table /*来源表*/
|
||||||
|
|
||||||
|
FROM cust_label_rela_agi_INS P1
|
||||||
|
ON CONFLICT ( cust_id,cust_label_id)
|
||||||
|
DO UPDATE SET
|
||||||
|
cust_id=excluded.cust_id
|
||||||
|
,cust_label_id=excluded.cust_label_id
|
||||||
|
,cust_label_name=excluded.cust_label_name
|
||||||
|
,cust_external_id=excluded.cust_external_id
|
||||||
|
,Etl_Batch_No=excluded.Etl_Batch_No
|
||||||
|
,Etl_First_Dt=excluded.Etl_First_Dt
|
||||||
|
,Etl_Job=excluded.Etl_Job
|
||||||
|
,Etl_Proc_Dt=excluded.Etl_Proc_Dt
|
||||||
|
,Etl_Tx_Dt=excluded.Etl_Tx_Dt
|
||||||
|
,Src_Sysname=excluded.Src_Sysname
|
||||||
|
,Src_Table=excluded.Src_Table
|
||||||
|
|
||||||
|
|
||||||
|
;
|
||||||
|
/*****程序结束退出 */
|
||||||
|
\q
|
||||||
|
|
|
@ -0,0 +1,75 @@
|
||||||
|
# coding: utf-8
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import psycopg2
|
||||||
|
import uuid
|
||||||
|
import datetime
|
||||||
|
import time
|
||||||
|
import hashlib
|
||||||
|
import time
|
||||||
|
|
||||||
|
#荟聚
|
||||||
|
|
||||||
|
#全局变量,便于参数使用的预设值
|
||||||
|
current_date = datetime.date.today() # 获取当前日期
|
||||||
|
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||||
|
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||||
|
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||||
|
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||||
|
sign_version = 'v2' # 签名版本号,固定值v2
|
||||||
|
nonce = str(uuid.uuid4())
|
||||||
|
|
||||||
|
#获取签名令牌
|
||||||
|
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||||
|
# 按照指定的格式拼接字符串
|
||||||
|
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||||
|
# 使用SHA256算法计算哈希值
|
||||||
|
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||||
|
return sha256_hash
|
||||||
|
|
||||||
|
#获取鉴权token
|
||||||
|
def get_token(url):
|
||||||
|
#请求鉴权接口
|
||||||
|
authRequest=requests.get(url)
|
||||||
|
#解析结果
|
||||||
|
if not authRequest: #若为空时,返回空
|
||||||
|
return
|
||||||
|
auth=json.loads(authRequest.text)
|
||||||
|
return auth
|
||||||
|
|
||||||
|
print('开始加载数据:huiju_tags:荟聚标签信息')
|
||||||
|
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||||
|
|
||||||
|
print('开始请求令牌。')
|
||||||
|
#authRequest=requests.get(authUrl)
|
||||||
|
#auth=json.loads(authRequest.text)
|
||||||
|
auth = get_token(authUrl)
|
||||||
|
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||||
|
i = 0
|
||||||
|
while 'error' in auth and i < 60:
|
||||||
|
time.sleep(60)
|
||||||
|
auth = get_token(authUrl)
|
||||||
|
i = i + 1
|
||||||
|
print('开始请求数据总数。')
|
||||||
|
url='https://api.huiju.cool/v2/tags'
|
||||||
|
header={}
|
||||||
|
body={'access_token':auth['access_token'],'limit':'1000',}
|
||||||
|
dataReqL=requests.get(url,headers=header,params=body)
|
||||||
|
resL=json.loads(dataReqL.text)
|
||||||
|
# print(resL)
|
||||||
|
# dataList=resL['data']
|
||||||
|
dataList=resL
|
||||||
|
total=len(dataList)
|
||||||
|
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||||
|
host="172.17.0.8", port="5432")
|
||||||
|
print('数据库连接成功')
|
||||||
|
dataId=str(uuid.uuid4())
|
||||||
|
print('临时id:'+dataId)
|
||||||
|
json_object = json.dumps(dataList)
|
||||||
|
cur=conn.cursor()
|
||||||
|
sql="update data_api.api_data set is_loaded = '1' where api_id = 'a7cf7f4d-108d-410c-9308-e13dfc56';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||||
|
cur.execute(sql,[dataId,'a7cf7f4d-108d-410c-9308-e13dfc56',json_object,total])
|
||||||
|
conn.commit()
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
print('加载数据结束:huiju_tags:荟聚标签信息')
|
|
@ -0,0 +1,42 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
|
||||||
|
DELETE FROM data_api.scrm_tags;
|
||||||
|
|
||||||
|
insert into data_api.scrm_tags (
|
||||||
|
id
|
||||||
|
, name
|
||||||
|
, group_name
|
||||||
|
, type
|
||||||
|
, date_created
|
||||||
|
, last_updated
|
||||||
|
,etl_tx_dt
|
||||||
|
)
|
||||||
|
select
|
||||||
|
case when trim(both from id)='' then null else id::text end id
|
||||||
|
, case when trim(both from name)='' then null else name::text end name
|
||||||
|
, case when trim(both from group_name)='' then null else group_name::text end group_name
|
||||||
|
, case when trim(both from type)='' then null else type::text end type
|
||||||
|
, case when trim(both from date_created)='' then null else date_created::text end date_created
|
||||||
|
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||||
|
,etl_tx_dt
|
||||||
|
from (
|
||||||
|
select
|
||||||
|
(json_array_elements(data::json)::json->>'id') id
|
||||||
|
, (json_array_elements(data::json)::json->>'name') name
|
||||||
|
, (json_array_elements(data::json)::json->>'groupName') group_name
|
||||||
|
, (json_array_elements(data::json)::json->>'type') type
|
||||||
|
, (json_array_elements(data::json)::json->>'dateCreated') date_created
|
||||||
|
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||||
|
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||||
|
from (select * from data_api.api_data
|
||||||
|
WHERE api_id='a7cf7f4d-108d-410c-9308-e13dfc56' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||||
|
|
||||||
|
update data_api.api_data
|
||||||
|
set is_loaded = '1' ,
|
||||||
|
status = '1',
|
||||||
|
request_tm = current_timestamp(0)
|
||||||
|
where api_id='a7cf7f4d-108d-410c-9308-e13dfc56';
|
||||||
|
\q
|
Loading…
Reference in New Issue