add workflow 荟聚API_1,dev

This commit is contained in:
root 2024-04-09 11:42:27 +08:00
parent 2396efdf98
commit 303dbc0e6e
16 changed files with 958 additions and 0 deletions

View File

@ -0,0 +1,58 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
delete from p10_sa.S98_S_scrm_contact
;
insert into p10_sa.S98_S_scrm_contact
( id
, city
, company
, email
, last_updated
, mobile
, name
, province
, date_join
, etl_tx_dt )
select
id
, city
, company
, email
, last_updated
, mobile
, name
, province
, date_join
, etl_tx_dt
from p00_tal.S98_S_scrm_contact
where etl_tx_dt >= CURRENT_DATE;
delete from p12_sfull.S98_S_scrm_contact
where (id) in (select id from p10_sa.S98_S_scrm_contact) ;
;
insert into p12_sfull.S98_S_scrm_contact
( id
, city
, company
, email
, last_updated
, mobile
, name
, province
, date_join
, etl_tx_dt )
select
id
, city
, company
, email
, last_updated
, mobile
, name
, province
, date_join
, etl_tx_dt
from p10_sa.S98_S_scrm_contact
;
\q

View File

@ -0,0 +1,21 @@
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_scrm_contact (
id TEXT
, city TEXT
, company TEXT
, email TEXT
, last_updated TEXT
, mobile TEXT
, name TEXT
, province TEXT
, date_join TEXT
, etl_tx_dt TIMESTAMP
)
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'scrm_contact' );

View File

@ -0,0 +1,55 @@
create table if not exists p10_sa.S98_S_scrm_contact (
id TEXT
, city TEXT
, company TEXT
, email TEXT
, last_updated TEXT
, mobile TEXT
, name TEXT
, province TEXT
, date_join TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact.id IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact.city IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact.company IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact.email IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact.last_updated IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact.mobile IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact.name IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact.province IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact.date_join IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact.etl_tx_dt IS '';
COMMENT ON TABLE p10_sa.S98_S_scrm_contact IS '';
create table if not exists p12_sfull.S98_S_scrm_contact (
id TEXT
, city TEXT
, company TEXT
, email TEXT
, last_updated TEXT
, mobile TEXT
, name TEXT
, province TEXT
, date_join TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact.id IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact.city IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact.company IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact.email IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact.last_updated IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact.mobile IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact.name IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact.province IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact.date_join IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact.etl_tx_dt IS '';
COMMENT ON TABLE p12_sfull.S98_S_scrm_contact IS '';

View File

@ -0,0 +1,58 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
delete from p10_sa.S98_S_scrm_contact_lastupdated
;
insert into p10_sa.S98_S_scrm_contact_lastupdated
( id
, city
, company
, email
, last_updated
, mobile
, name
, province
, date_join
, etl_tx_dt )
select
id
, city
, company
, email
, last_updated
, mobile
, name
, province
, date_join
, etl_tx_dt
from p00_tal.S98_S_scrm_contact_lastupdated
;
delete from p12_sfull.S98_S_scrm_contact_lastupdated
;
;
insert into p12_sfull.S98_S_scrm_contact_lastupdated
( id
, city
, company
, email
, last_updated
, mobile
, name
, province
, date_join
, etl_tx_dt )
select
id
, city
, company
, email
, last_updated
, mobile
, name
, province
, date_join
, etl_tx_dt
from p10_sa.S98_S_scrm_contact_lastupdated
;
\q

View File

@ -0,0 +1,21 @@
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_scrm_contact_lastupdated (
id TEXT
, city TEXT
, company TEXT
, email TEXT
, last_updated TEXT
, mobile TEXT
, name TEXT
, province TEXT
, date_join TEXT
, etl_tx_dt TIMESTAMP
)
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'scrm_contact_lastupdated' );

View File

@ -0,0 +1,55 @@
create table if not exists p10_sa.S98_S_scrm_contact_lastupdated (
id TEXT
, city TEXT
, company TEXT
, email TEXT
, last_updated TEXT
, mobile TEXT
, name TEXT
, province TEXT
, date_join TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_lastupdated.id IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_lastupdated.city IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_lastupdated.company IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_lastupdated.email IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_lastupdated.last_updated IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_lastupdated.mobile IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_lastupdated.name IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_lastupdated.province IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_lastupdated.date_join IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_lastupdated.etl_tx_dt IS '';
COMMENT ON TABLE p10_sa.S98_S_scrm_contact_lastupdated IS '';
create table if not exists p12_sfull.S98_S_scrm_contact_lastupdated (
id TEXT
, city TEXT
, company TEXT
, email TEXT
, last_updated TEXT
, mobile TEXT
, name TEXT
, province TEXT
, date_join TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_lastupdated.id IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_lastupdated.city IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_lastupdated.company IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_lastupdated.email IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_lastupdated.last_updated IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_lastupdated.mobile IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_lastupdated.name IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_lastupdated.province IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_lastupdated.date_join IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_lastupdated.etl_tx_dt IS '';
COMMENT ON TABLE p12_sfull.S98_S_scrm_contact_lastupdated IS '';

View File

@ -0,0 +1,78 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
delete from p10_sa.S98_S_scrm_contact_merge
;
insert into p10_sa.S98_S_scrm_contact_merge
( mergedcustomerid
, id
, customer_id
, date
, event
, target_name
, last_updated
, c_name
, c_type
, contentname
, source
, tag
, c_keyword
, attr2
, etl_tx_dt )
select
mergedcustomerid
, id
, customer_id
, date
, event
, target_name
, last_updated
, c_name
, c_type
, contentname
, source
, tag
, c_keyword
, attr2
, etl_tx_dt
from p00_tal.S98_S_scrm_contact_merge
;
delete from p12_sfull.S98_S_scrm_contact_merge
;
;
insert into p12_sfull.S98_S_scrm_contact_merge
( mergedcustomerid
, id
, customer_id
, date
, event
, target_name
, last_updated
, c_name
, c_type
, contentname
, source
, tag
, c_keyword
, attr2
, etl_tx_dt )
select
mergedcustomerid
, id
, customer_id
, date
, event
, target_name
, last_updated
, c_name
, c_type
, contentname
, source
, tag
, c_keyword
, attr2
, etl_tx_dt
from p10_sa.S98_S_scrm_contact_merge
;
\q

View File

@ -0,0 +1,26 @@
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_scrm_contact_merge (
mergedcustomerid TEXT
, id TEXT
, customer_id TEXT
, date TEXT
, event TEXT
, target_name TEXT
, last_updated TEXT
, c_name TEXT
, c_type TEXT
, contentname TEXT
, source TEXT
, tag TEXT
, c_keyword TEXT
, attr2 TEXT
, etl_tx_dt TIMESTAMP
)
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'scrm_contact_merge' );

View File

@ -0,0 +1,75 @@
create table if not exists p10_sa.S98_S_scrm_contact_merge (
mergedcustomerid TEXT
, id TEXT
, customer_id TEXT
, date TEXT
, event TEXT
, target_name TEXT
, last_updated TEXT
, c_name TEXT
, c_type TEXT
, contentname TEXT
, source TEXT
, tag TEXT
, c_keyword TEXT
, attr2 TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.mergedcustomerid IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.id IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.customer_id IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.date IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.event IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.target_name IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.last_updated IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.c_name IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.c_type IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.contentname IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.source IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.tag IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.c_keyword IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.attr2 IS '';
COMMENT ON COLUMN p10_sa.S98_S_scrm_contact_merge.etl_tx_dt IS '';
COMMENT ON TABLE p10_sa.S98_S_scrm_contact_merge IS '';
create table if not exists p12_sfull.S98_S_scrm_contact_merge (
mergedcustomerid TEXT
, id TEXT
, customer_id TEXT
, date TEXT
, event TEXT
, target_name TEXT
, last_updated TEXT
, c_name TEXT
, c_type TEXT
, contentname TEXT
, source TEXT
, tag TEXT
, c_keyword TEXT
, attr2 TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.mergedcustomerid IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.id IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.customer_id IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.date IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.event IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.target_name IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.last_updated IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.c_name IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.c_type IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.contentname IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.source IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.tag IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.c_keyword IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.attr2 IS '';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_contact_merge.etl_tx_dt IS '';
COMMENT ON TABLE p12_sfull.S98_S_scrm_contact_merge IS '';

View File

@ -0,0 +1,72 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
import time
import hashlib
import time
#全局变量,便于参数使用的预设值
current_date = datetime.date.today() # 获取当前日期
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
timestamp = time.time() # 为Unix time即从"1970-01-01 00:00:00"至今的秒数;
sign_version = 'v2' # 签名版本号固定值v2
nonce = str(uuid.uuid4())
#获取签名令牌
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
# 按照指定的格式拼接字符串
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
# 使用SHA256算法计算哈希值
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
return sha256_hash
#获取鉴权token
def get_token(url):
#请求鉴权接口
authRequest=requests.get(url)
#解析结果
if not authRequest: #若为空时,返回空
return
auth=json.loads(authRequest.text)
return auth
print('开始加载数据custom_events_update:合并SCRM CONTACT')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
#authRequest=requests.get(authUrl)
#auth=json.loads(authRequest.text)
auth = get_token(authUrl)
#循环判断auth是否为空若为空等待30s后重新请求
i = 0
while not auth['access_token'] and i < 60:
time.sleep(60)
auth = get_token(authUrl)
i = i + 1
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customerEvents'
header={}
body={'access_token':auth['access_token'],'limit':'','sort':'lastUpdated','lastUpdated[le]':formatted_current_date,'lastUpdated[ge]':formatted_previous_date,}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = 'ff5c7bf6-0d18-4201-9501-8fdd6152';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'ff5c7bf6-0d18-4201-9501-8fdd6152',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束custom_events_update:合并SCRM CONTACT')

View File

@ -0,0 +1,66 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.scrm_contact_merge;
insert into data_api.scrm_contact_merge (
mergedCustomerId
, id
, customer_id
, date
, event
, target_name
, last_updated
, c_name
, c_type
, contentName
, source
, tag
, c_keyword
, attr2
,etl_tx_dt
)
select
case when trim(both from mergedCustomerId)='' then null else mergedCustomerId::text end mergedCustomerId
, case when trim(both from id)='' then null else id::text end id
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
, case when trim(both from date)='' then null else date::text end date
, case when trim(both from event)='' then null else event::text end event
, case when trim(both from target_name)='' then null else target_name::text end target_name
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from c_name)='' then null else c_name::text end c_name
, case when trim(both from c_type)='' then null else c_type::text end c_type
, case when trim(both from contentName)='' then null else contentName::text end contentName
, case when trim(both from source)='' then null else source::text end source
, case when trim(both from tag)='' then null else tag::text end tag
, case when trim(both from c_keyword)='' then null else c_keyword::text end c_keyword
, case when trim(both from attr2)='' then null else attr2::text end attr2
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'mergedCustomerId') mergedCustomerId
, (json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'customerId') customer_id
, (json_array_elements(data::json)::json->>'date') date
, (json_array_elements(data::json)::json->>'event') event
, (json_array_elements(data::json)::json->>'targetName') target_name
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'c_name') c_name
, (json_array_elements(data::json)::json->>'c_type') c_type
, (json_array_elements(data::json)::json->>'contentName') contentName
, (json_array_elements(data::json)::json->>'source') source
, (json_array_elements(data::json)::json->>'tag') tag
, (json_array_elements(data::json)::json->>'c_keyword') c_keyword
, (json_array_elements(data::json)::json->>'attr2') attr2
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='ff5c7bf6-0d18-4201-9501-8fdd6152' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='ff5c7bf6-0d18-4201-9501-8fdd6152';
\q

View File

@ -0,0 +1,72 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
import time
import hashlib
import time
#全局变量,便于参数使用的预设值
current_date = datetime.date.today() # 获取当前日期
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
timestamp = time.time() # 为Unix time即从"1970-01-01 00:00:00"至今的秒数;
sign_version = 'v2' # 签名版本号固定值v2
nonce = str(uuid.uuid4())
#获取签名令牌
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
# 按照指定的格式拼接字符串
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
# 使用SHA256算法计算哈希值
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
return sha256_hash
#获取鉴权token
def get_token(url):
#请求鉴权接口
authRequest=requests.get(url)
#解析结果
if not authRequest: #若为空时,返回空
return
auth=json.loads(authRequest.text)
return auth
print('开始加载数据update_scrm_contact:更新scrm CONTACT')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
#authRequest=requests.get(authUrl)
#auth=json.loads(authRequest.text)
auth = get_token(authUrl)
#循环判断auth是否为空若为空等待30s后重新请求
i = 0
while not auth['access_token'] and i < 60:
time.sleep(60)
auth = get_token(authUrl)
i = i + 1
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customers'
header={}
body={'access_token':auth['access_token'],'limit':'2000','sort':'lastUpdated','lastUpdated[le]':formatted_current_date,'lastUpdated[ge]':formatted_previous_date,}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = '03d918ea-00d1-448a-8b9b-6a0b1a33';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'03d918ea-00d1-448a-8b9b-6a0b1a33',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束update_scrm_contact:更新scrm CONTACT')

View File

@ -0,0 +1,51 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.scrm_contact_lastUpdated;
insert into data_api.scrm_contact_lastUpdated (
id
, city
, company
, email
, last_updated
, mobile
, name
, province
, date_join
,etl_tx_dt
)
select
case when trim(both from id)='' then null else id::text end id
, case when trim(both from city)='' then null else city::text end city
, case when trim(both from company)='' then null else company::text end company
, case when trim(both from email)='' then null else email::text end email
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from mobile)='' then null else mobile::text end mobile
, case when trim(both from name)='' then null else name::text end name
, case when trim(both from province)='' then null else province::text end province
, case when trim(both from date_join)='' then null else date_join::text end date_join
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'city') city
, (json_array_elements(data::json)::json->>'company') company
, (json_array_elements(data::json)::json->>'email') email
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'mobile') mobile
, (json_array_elements(data::json)::json->>'name') name
, (json_array_elements(data::json)::json->>'province') province
, (json_array_elements(data::json)::json->>'dateJoin') date_join
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='03d918ea-00d1-448a-8b9b-6a0b1a33' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='03d918ea-00d1-448a-8b9b-6a0b1a33';
\q

View File

@ -0,0 +1,127 @@
#!/usr/bin/python
# -*- encoding=utf-8 -*-
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import json
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
sshHook = SSHHook(ssh_conn_id ='ssh_air')
default_args = {
'owner': 'info@idgvalue.com',
'email_on_failure': True,
'email_on_retry':True,
'start_date': datetime(2024, 1, 1),
'depends_on_past': False,
'retries': 6,
'retry_delay': timedelta(minutes=10),
}
dag = DAG('wf_dag_tk_api_1', default_args=default_args,
schedule_interval="5 0 * * *",
catchup=False,
dagrun_timeout=timedelta(minutes=160),
max_active_runs=3)
task_failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="task_failed",
to=["info@idgvalue.com"],
cc=[""],
subject="tk_api_1_failed",
html_content='<h3>您好tk_api_1作业失败请及时处理" </h3>')
update_scrm_contact_feign = SSHOperator(
ssh_hook=sshHook,
task_id='update_scrm_contact_feign',
command='python3 /data/airflow/etl/API/update_scrm_contact_feign.py',
depends_on_past=False,
retries=3,
dag=dag)
update_scrm_contact_load = SSHOperator(
ssh_hook=sshHook,
task_id='update_scrm_contact_load',
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
params={'my_param':"update_scrm_contact_load"},
depends_on_past=False,
retries=3,
dag=dag)
update_scrm_contact_feign >> update_scrm_contact_load
scrm_contact_feign = SSHOperator(
ssh_hook=sshHook,
task_id='scrm_contact_feign',
command='python3 /data/airflow/etl/API/scrm_contact_feign.py',
depends_on_past=False,
retries=3,
dag=dag)
scrm_contact_load = SSHOperator(
ssh_hook=sshHook,
task_id='scrm_contact_load',
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
params={'my_param':"scrm_contact_load"},
depends_on_past=False,
retries=3,
dag=dag)
scrm_contact_feign >> scrm_contact_load
custom_events_update_feign = SSHOperator(
ssh_hook=sshHook,
task_id='custom_events_update_feign',
command='python3 /data/airflow/etl/API/custom_events_update_feign.py',
depends_on_past=False,
retries=3,
dag=dag)
custom_events_update_load = SSHOperator(
ssh_hook=sshHook,
task_id='custom_events_update_load',
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
params={'my_param':"custom_events_update_load"},
depends_on_past=False,
retries=3,
dag=dag)
custom_events_update_feign >> custom_events_update_load
scrm_contact_1129 = SSHOperator(
ssh_hook=sshHook,
task_id='scrm_contact_1129',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"S98_S_scrm_contact"},
depends_on_past=False,
retries=3,
dag=dag)
scrm_contact_merge_6671 = SSHOperator(
ssh_hook=sshHook,
task_id='scrm_contact_merge_6671',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"S98_S_scrm_contact_merge"},
depends_on_past=False,
retries=3,
dag=dag)
scrm_contact_lastupdated_4112 = SSHOperator(
ssh_hook=sshHook,
task_id='scrm_contact_lastupdated_4112',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"S98_S_scrm_contact_lastupdated"},
depends_on_past=False,
retries=3,
dag=dag)
custom_events_update_load >> scrm_contact_merge_6671
update_scrm_contact_load >> scrm_contact_lastupdated_4112
scrm_contact_load >> scrm_contact_1129
scrm_contact_1129 >> task_failed

View File

@ -0,0 +1,72 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
import time
import hashlib
import time
#全局变量,便于参数使用的预设值
current_date = datetime.date.today() # 获取当前日期
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
timestamp = time.time() # 为Unix time即从"1970-01-01 00:00:00"至今的秒数;
sign_version = 'v2' # 签名版本号固定值v2
nonce = str(uuid.uuid4())
#获取签名令牌
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
# 按照指定的格式拼接字符串
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
# 使用SHA256算法计算哈希值
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
return sha256_hash
#获取鉴权token
def get_token(url):
#请求鉴权接口
authRequest=requests.get(url)
#解析结果
if not authRequest: #若为空时,返回空
return
auth=json.loads(authRequest.text)
return auth
print('开始加载数据scrm_contact:获取SCRM contact')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
#authRequest=requests.get(authUrl)
#auth=json.loads(authRequest.text)
auth = get_token(authUrl)
#循环判断auth是否为空若为空等待30s后重新请求
i = 0
while not auth['access_token'] and i < 60:
time.sleep(60)
auth = get_token(authUrl)
i = i + 1
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customers'
header={}
body={'access_token':auth['access_token'],'sort':'dateJoin','dateJoin[ge]':formatted_previous_date,'limit':'2000','dateJoin[le]':formatted_current_date,}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = 'cab1a761-b8e5-4db4-a769-88db3ec1';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'cab1a761-b8e5-4db4-a769-88db3ec1',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束scrm_contact:获取SCRM contact')

View File

@ -0,0 +1,51 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.scrm_contact;
insert into data_api.scrm_contact (
id
, city
, company
, email
, last_updated
, mobile
, name
, province
, date_join
,etl_tx_dt
)
select
case when trim(both from id)='' then null else id::text end id
, case when trim(both from city)='' then null else city::text end city
, case when trim(both from company)='' then null else company::text end company
, case when trim(both from email)='' then null else email::text end email
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from mobile)='' then null else mobile::text end mobile
, case when trim(both from name)='' then null else name::text end name
, case when trim(both from province)='' then null else province::text end province
, case when trim(both from date_join)='' then null else date_join::text end date_join
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'city') city
, (json_array_elements(data::json)::json->>'company') company
, (json_array_elements(data::json)::json->>'email') email
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'mobile') mobile
, (json_array_elements(data::json)::json->>'name') name
, (json_array_elements(data::json)::json->>'province') province
, (json_array_elements(data::json)::json->>'dateJoin') date_join
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='cab1a761-b8e5-4db4-a769-88db3ec1' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='cab1a761-b8e5-4db4-a769-88db3ec1';
\q