add workflow 荟聚API,dev

This commit is contained in:
root 2024-02-01 10:52:10 +08:00
parent ef53aa7083
commit d7bd8bb95e
18 changed files with 1137 additions and 0 deletions

View File

@ -0,0 +1,41 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
# 获取当前日期
current_date = datetime.date.today()
previous_date = current_date - datetime.timedelta(days=365)
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ")
print('开始加载数据custom_events_5:获取线索事件_c_minipro_page_view')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
authRequest=requests.get(authUrl)
auth=json.loads(authRequest.text)
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customerEvents'
header={}
body={'access_token':auth['access_token'],'limit':'2000','event':'c_minipro_page_view','sort':'date','date[ge]':formatted_previous_date,'date[le]':formatted_current_date,}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = 'a71a43ff-efcd-4427-8dbc-ed2686e4';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'a71a43ff-efcd-4427-8dbc-ed2686e4',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束custom_events_5:获取线索事件_c_minipro_page_view')

View File

@ -0,0 +1,93 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.customer_events_c_minipro_page_view;
insert into data_api.customer_events_c_minipro_page_view (
browser
, browser_version
, c_name
, c_source_content
, c_url
, customer_id
, customer_id_str
, date
, event
, external_id
, id
, id_str
, ip
, ip_city
, ip_country
, ip_province
, last_updated
, os
, os_version
, platform
, target_id
, target_name
, url
,etl_tx_dt
)
select
case when trim(both from browser)='' then null else browser::text end browser
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
, case when trim(both from c_name)='' then null else c_name::text end c_name
, case when trim(both from c_source_content)='' then null else c_source_content::text end c_source_content
, case when trim(both from c_url)='' then null else c_url::text end c_url
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
, case when trim(both from date)='' then null else date::text end date
, case when trim(both from event)='' then null else event::text end event
, case when trim(both from external_id)='' then null else external_id::text end external_id
, case when trim(both from id)='' then null else id::text end id
, case when trim(both from id_str)='' then null else id_str::text end id_str
, case when trim(both from ip)='' then null else ip::text end ip
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from os)='' then null else os::text end os
, case when trim(both from os_version)='' then null else os_version::text end os_version
, case when trim(both from platform)='' then null else platform::text end platform
, case when trim(both from target_id)='' then null else target_id::text end target_id
, case when trim(both from target_name)='' then null else target_name::text end target_name
, case when trim(both from url)='' then null else url::text end url
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'browser') browser
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
, (json_array_elements(data::json)::json->>'c_name') c_name
, (json_array_elements(data::json)::json->>'c_sourceContent') c_source_content
, (json_array_elements(data::json)::json->>'c_url') c_url
, (json_array_elements(data::json)::json->>'customerId') customer_id
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
, (json_array_elements(data::json)::json->>'date') date
, (json_array_elements(data::json)::json->>'event') event
, (json_array_elements(data::json)::json->>'externalId') external_id
, (json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'idStr') id_str
, (json_array_elements(data::json)::json->>'ip') ip
, (json_array_elements(data::json)::json->>'ipCity') ip_city
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'os') os
, (json_array_elements(data::json)::json->>'osVersion') os_version
, (json_array_elements(data::json)::json->>'platform') platform
, (json_array_elements(data::json)::json->>'targetId') target_id
, (json_array_elements(data::json)::json->>'targetName') target_name
, (json_array_elements(data::json)::json->>'url') url
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='a71a43ff-efcd-4427-8dbc-ed2686e4' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='a71a43ff-efcd-4427-8dbc-ed2686e4';
\q

View File

@ -0,0 +1,41 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
# 获取当前日期
current_date = datetime.date.today()
previous_date = current_date - datetime.timedelta(days=365)
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ")
print('开始加载数据custom_events_8:获取线索事件_click_link_in_page')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
authRequest=requests.get(authUrl)
auth=json.loads(authRequest.text)
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customerEvents'
header={}
body={'sort':'date','date[le]':formatted_current_date,'limit':'2000','event':'click_link_in_page','access_token':auth['access_token'],'date[ge]':formatted_previous_date,}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = 'b20bb19a-6ace-4d10-ab2b-4cdb3c0a';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'b20bb19a-6ace-4d10-ab2b-4cdb3c0a',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束custom_events_8:获取线索事件_click_link_in_page')

View File

@ -0,0 +1,102 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.customer_events_click_link_in_page;
insert into data_api.customer_events_click_link_in_page (
browser
, browser_version
, customer_id
, customer_id_str
, date
, device
, event
, external_id
, id
, id_str
, ip
, ip_city
, ip_country
, ip_province
, last_updated
, os
, os_version
, page_id
, page_type
, platform
, score
, screen_height
, screen_width
, target_id
, target_name
, url
,etl_tx_dt
)
select
case when trim(both from browser)='' then null else browser::text end browser
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
, case when trim(both from date)='' then null else date::text end date
, case when trim(both from device)='' then null else device::text end device
, case when trim(both from event)='' then null else event::text end event
, case when trim(both from external_id)='' then null else external_id::text end external_id
, case when trim(both from id)='' then null else id::text end id
, case when trim(both from id_str)='' then null else id_str::text end id_str
, case when trim(both from ip)='' then null else ip::text end ip
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from os)='' then null else os::text end os
, case when trim(both from os_version)='' then null else os_version::text end os_version
, case when trim(both from page_id)='' then null else page_id::text end page_id
, case when trim(both from page_type)='' then null else page_type::text end page_type
, case when trim(both from platform)='' then null else platform::text end platform
, case when trim(both from score)='' then null else score::text end score
, case when trim(both from screen_height)='' then null else screen_height::text end screen_height
, case when trim(both from screen_width)='' then null else screen_width::text end screen_width
, case when trim(both from target_id)='' then null else target_id::text end target_id
, case when trim(both from target_name)='' then null else target_name::text end target_name
, case when trim(both from url)='' then null else url::text end url
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'browser') browser
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
, (json_array_elements(data::json)::json->>'customerId') customer_id
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
, (json_array_elements(data::json)::json->>'date') date
, (json_array_elements(data::json)::json->>'device') device
, (json_array_elements(data::json)::json->>'event') event
, (json_array_elements(data::json)::json->>'externalId') external_id
, (json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'idStr') id_str
, (json_array_elements(data::json)::json->>'ip') ip
, (json_array_elements(data::json)::json->>'ipCity') ip_city
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'os') os
, (json_array_elements(data::json)::json->>'osVersion') os_version
, (json_array_elements(data::json)::json->>'pageId') page_id
, (json_array_elements(data::json)::json->>'pageType') page_type
, (json_array_elements(data::json)::json->>'platform') platform
, (json_array_elements(data::json)::json->>'score') score
, (json_array_elements(data::json)::json->>'screenHeight') screen_height
, (json_array_elements(data::json)::json->>'screenWidth') screen_width
, (json_array_elements(data::json)::json->>'targetId') target_id
, (json_array_elements(data::json)::json->>'targetName') target_name
, (json_array_elements(data::json)::json->>'url') url
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='b20bb19a-6ace-4d10-ab2b-4cdb3c0a' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='b20bb19a-6ace-4d10-ab2b-4cdb3c0a';
\q

View File

@ -0,0 +1,41 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
# 获取当前日期
current_date = datetime.date.today()
previous_date = current_date - datetime.timedelta(days=365)
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ")
print('开始加载数据custom_events_1:获取线索事件_open_app')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
authRequest=requests.get(authUrl)
auth=json.loads(authRequest.text)
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customerEvents'
header={}
body={'access_token':auth['access_token'],'sort':'date','limit':'2000','date[ge]':formatted_previous_date,'date[le]':formatted_current_date,'event':'open_app',}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = '2433868c-3416-4475-884d-11e687a6';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'2433868c-3416-4475-884d-11e687a6',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束custom_events_1:获取线索事件_open_app')

View File

@ -0,0 +1,102 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.customer_events_open_app;
insert into data_api.customer_events_open_app (
browser
, browser_version
, customer_id
, customer_id_str
, date
, device
, event
, external_id
, id
, id_str
, ip
, ip_city
, ip_country
, ip_county
, ip_province
, last_updated
, os
, os_version
, page_id
, page_type
, platform
, screen_height
, screen_width
, target_id
, target_name
, url
,etl_tx_dt
)
select
case when trim(both from browser)='' then null else browser::text end browser
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
, case when trim(both from date)='' then null else date::text end date
, case when trim(both from device)='' then null else device::text end device
, case when trim(both from event)='' then null else event::text end event
, case when trim(both from external_id)='' then null else external_id::text end external_id
, case when trim(both from id)='' then null else id::text end id
, case when trim(both from id_str)='' then null else id_str::text end id_str
, case when trim(both from ip)='' then null else ip::text end ip
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
, case when trim(both from ip_county)='' then null else ip_county::text end ip_county
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from os)='' then null else os::text end os
, case when trim(both from os_version)='' then null else os_version::text end os_version
, case when trim(both from page_id)='' then null else page_id::text end page_id
, case when trim(both from page_type)='' then null else page_type::text end page_type
, case when trim(both from platform)='' then null else platform::text end platform
, case when trim(both from screen_height)='' then null else screen_height::text end screen_height
, case when trim(both from screen_width)='' then null else screen_width::text end screen_width
, case when trim(both from target_id)='' then null else target_id::text end target_id
, case when trim(both from target_name)='' then null else target_name::text end target_name
, case when trim(both from url)='' then null else url::text end url
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'browser') browser
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
, (json_array_elements(data::json)::json->>'customerId') customer_id
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
, (json_array_elements(data::json)::json->>'date') date
, (json_array_elements(data::json)::json->>'device') device
, (json_array_elements(data::json)::json->>'event') event
, (json_array_elements(data::json)::json->>'externalId') external_id
, (json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'idStr') id_str
, (json_array_elements(data::json)::json->>'ip') ip
, (json_array_elements(data::json)::json->>'ipCity') ip_city
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
, (json_array_elements(data::json)::json->>'ipCounty') ip_county
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'os') os
, (json_array_elements(data::json)::json->>'osVersion') os_version
, (json_array_elements(data::json)::json->>'pageId') page_id
, (json_array_elements(data::json)::json->>'pageType') page_type
, (json_array_elements(data::json)::json->>'platform') platform
, (json_array_elements(data::json)::json->>'screenHeight') screen_height
, (json_array_elements(data::json)::json->>'screenWidth') screen_width
, (json_array_elements(data::json)::json->>'targetId') target_id
, (json_array_elements(data::json)::json->>'targetName') target_name
, (json_array_elements(data::json)::json->>'url') url
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='2433868c-3416-4475-884d-11e687a6' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='2433868c-3416-4475-884d-11e687a6';
\q

View File

@ -0,0 +1,41 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
# 获取当前日期
current_date = datetime.date.today()
previous_date = current_date - datetime.timedelta(days=365)
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ")
print('开始加载数据custom_events_2:获取线索事件_open_content_page')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
authRequest=requests.get(authUrl)
auth=json.loads(authRequest.text)
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customerEvents'
header={}
body={'access_token':auth['access_token'],'limit':'2000','event':'open_content_page','sort':'date','date[ge]':formatted_previous_date,'date[le]':formatted_current_date,}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = '43edfb13-8a42-4152-9dc8-d5feb3c8';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'43edfb13-8a42-4152-9dc8-d5feb3c8',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束custom_events_2:获取线索事件_open_content_page')

View File

@ -0,0 +1,102 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.customer_events_open_content_page;
insert into data_api.customer_events_open_content_page (
browser
, browser_version
, customer_id
, customer_id_str
, date
, device
, event
, external_id
, id
, id_str
, ip
, ip_city
, ip_country
, ip_province
, last_updated
, os
, os_version
, page_id
, page_type
, platform
, screen_height
, screen_width
, short_id
, target_id
, target_name
, url
,etl_tx_dt
)
select
case when trim(both from browser)='' then null else browser::text end browser
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
, case when trim(both from date)='' then null else date::text end date
, case when trim(both from device)='' then null else device::text end device
, case when trim(both from event)='' then null else event::text end event
, case when trim(both from external_id)='' then null else external_id::text end external_id
, case when trim(both from id)='' then null else id::text end id
, case when trim(both from id_str)='' then null else id_str::text end id_str
, case when trim(both from ip)='' then null else ip::text end ip
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from os)='' then null else os::text end os
, case when trim(both from os_version)='' then null else os_version::text end os_version
, case when trim(both from page_id)='' then null else page_id::text end page_id
, case when trim(both from page_type)='' then null else page_type::text end page_type
, case when trim(both from platform)='' then null else platform::text end platform
, case when trim(both from screen_height)='' then null else screen_height::text end screen_height
, case when trim(both from screen_width)='' then null else screen_width::text end screen_width
, case when trim(both from short_id)='' then null else short_id::text end short_id
, case when trim(both from target_id)='' then null else target_id::text end target_id
, case when trim(both from target_name)='' then null else target_name::text end target_name
, case when trim(both from url)='' then null else url::text end url
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'browser') browser
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
, (json_array_elements(data::json)::json->>'customerId') customer_id
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
, (json_array_elements(data::json)::json->>'date') date
, (json_array_elements(data::json)::json->>'device') device
, (json_array_elements(data::json)::json->>'event') event
, (json_array_elements(data::json)::json->>'externalId') external_id
, (json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'idStr') id_str
, (json_array_elements(data::json)::json->>'ip') ip
, (json_array_elements(data::json)::json->>'ipCity') ip_city
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'os') os
, (json_array_elements(data::json)::json->>'osVersion') os_version
, (json_array_elements(data::json)::json->>'pageId') page_id
, (json_array_elements(data::json)::json->>'pageType') page_type
, (json_array_elements(data::json)::json->>'platform') platform
, (json_array_elements(data::json)::json->>'screenHeight') screen_height
, (json_array_elements(data::json)::json->>'screenWidth') screen_width
, (json_array_elements(data::json)::json->>'shortId') short_id
, (json_array_elements(data::json)::json->>'targetId') target_id
, (json_array_elements(data::json)::json->>'targetName') target_name
, (json_array_elements(data::json)::json->>'url') url
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='43edfb13-8a42-4152-9dc8-d5feb3c8' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='43edfb13-8a42-4152-9dc8-d5feb3c8';
\q

View File

@ -0,0 +1,41 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
# 获取当前日期
current_date = datetime.date.today()
previous_date = current_date - datetime.timedelta(days=365)
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ")
print('开始加载数据custom_events_3:获取线索事件_open_page')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
authRequest=requests.get(authUrl)
auth=json.loads(authRequest.text)
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customerEvents'
header={}
body={'date[le]':formatted_current_date,'event':'open_page','access_token':auth['access_token'],'limit':'2000','sort':'date','date[ge]':formatted_previous_date,}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = 'a2b284c3-322f-4bc0-89ff-414aa66a';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'a2b284c3-322f-4bc0-89ff-414aa66a',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束custom_events_3:获取线索事件_open_page')

View File

@ -0,0 +1,120 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.customer_events_open_page;
insert into data_api.customer_events_open_page (
browser
, browser_version
, customer_id
, customer_id_str
, date
, device
, domain
, event
, external_id
, id
, id_str
, ip
, ip_city
, ip_country
, ip_province
, last_updated
, os
, os_version
, page_id
, page_type
, path
, platform
, root_domain
, score
, screen_height
, screen_width
, short_id
, source
, source1
, target_id
, target_name
, url
,etl_tx_dt
)
select
case when trim(both from browser)='' then null else browser::text end browser
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
, case when trim(both from date)='' then null else date::text end date
, case when trim(both from device)='' then null else device::text end device
, case when trim(both from domain)='' then null else domain::text end domain
, case when trim(both from event)='' then null else event::text end event
, case when trim(both from external_id)='' then null else external_id::text end external_id
, case when trim(both from id)='' then null else id::text end id
, case when trim(both from id_str)='' then null else id_str::text end id_str
, case when trim(both from ip)='' then null else ip::text end ip
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from os)='' then null else os::text end os
, case when trim(both from os_version)='' then null else os_version::text end os_version
, case when trim(both from page_id)='' then null else page_id::text end page_id
, case when trim(both from page_type)='' then null else page_type::text end page_type
, case when trim(both from path)='' then null else path::text end path
, case when trim(both from platform)='' then null else platform::text end platform
, case when trim(both from root_domain)='' then null else root_domain::text end root_domain
, case when trim(both from score)='' then null else score::text end score
, case when trim(both from screen_height)='' then null else screen_height::text end screen_height
, case when trim(both from screen_width)='' then null else screen_width::text end screen_width
, case when trim(both from short_id)='' then null else short_id::text end short_id
, case when trim(both from source)='' then null else source::text end source
, case when trim(both from source1)='' then null else source1::text end source1
, case when trim(both from target_id)='' then null else target_id::text end target_id
, case when trim(both from target_name)='' then null else target_name::text end target_name
, case when trim(both from url)='' then null else url::text end url
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'browser') browser
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
, (json_array_elements(data::json)::json->>'customerId') customer_id
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
, (json_array_elements(data::json)::json->>'date') date
, (json_array_elements(data::json)::json->>'device') device
, (json_array_elements(data::json)::json->>'domain') domain
, (json_array_elements(data::json)::json->>'event') event
, (json_array_elements(data::json)::json->>'externalId') external_id
, (json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'idStr') id_str
, (json_array_elements(data::json)::json->>'ip') ip
, (json_array_elements(data::json)::json->>'ipCity') ip_city
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'os') os
, (json_array_elements(data::json)::json->>'osVersion') os_version
, (json_array_elements(data::json)::json->>'pageId') page_id
, (json_array_elements(data::json)::json->>'pageType') page_type
, (json_array_elements(data::json)::json->>'path') path
, (json_array_elements(data::json)::json->>'platform') platform
, (json_array_elements(data::json)::json->>'rootDomain') root_domain
, (json_array_elements(data::json)::json->>'score') score
, (json_array_elements(data::json)::json->>'screenHeight') screen_height
, (json_array_elements(data::json)::json->>'screenWidth') screen_width
, (json_array_elements(data::json)::json->>'shortId') short_id
, (json_array_elements(data::json)::json->>'source') source
, (json_array_elements(data::json)::json->>'source1') source1
, (json_array_elements(data::json)::json->>'targetId') target_id
, (json_array_elements(data::json)::json->>'targetName') target_name
, (json_array_elements(data::json)::json->>'url') url
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='a2b284c3-322f-4bc0-89ff-414aa66a' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='a2b284c3-322f-4bc0-89ff-414aa66a';
\q

View File

@ -0,0 +1,41 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
# 获取当前日期
current_date = datetime.date.today()
previous_date = current_date - datetime.timedelta(days=365)
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ")
print('开始加载数据custom_events_6:获取线索事件_submit_form')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
authRequest=requests.get(authUrl)
auth=json.loads(authRequest.text)
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customerEvents'
header={}
body={'access_token':auth['access_token'],'event':'submit_form','limit':'2000','sort':'date','date[ge]':formatted_previous_date,'date[le]':formatted_current_date,}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = 'c6e5b45f-35f5-442b-84be-35ff2e76';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'c6e5b45f-35f5-442b-84be-35ff2e76',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束custom_events_6:获取线索事件_submit_form')

View File

@ -0,0 +1,96 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.customer_events_submit_form;
insert into data_api.customer_events_submit_form (
browser
, browser_version
, customer_id
, customer_id_str
, date
, event
, external_id
, id
, id_str
, ip
, ip_city
, ip_country
, ip_county
, ip_province
, last_updated
, os
, os_version
, page_id
, page_type
, platform
, score
, submit_id
, target_id
, target_name
,etl_tx_dt
)
select
case when trim(both from browser)='' then null else browser::text end browser
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
, case when trim(both from date)='' then null else date::text end date
, case when trim(both from event)='' then null else event::text end event
, case when trim(both from external_id)='' then null else external_id::text end external_id
, case when trim(both from id)='' then null else id::text end id
, case when trim(both from id_str)='' then null else id_str::text end id_str
, case when trim(both from ip)='' then null else ip::text end ip
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
, case when trim(both from ip_county)='' then null else ip_county::text end ip_county
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from os)='' then null else os::text end os
, case when trim(both from os_version)='' then null else os_version::text end os_version
, case when trim(both from page_id)='' then null else page_id::text end page_id
, case when trim(both from page_type)='' then null else page_type::text end page_type
, case when trim(both from platform)='' then null else platform::text end platform
, case when trim(both from score)='' then null else score::text end score
, case when trim(both from submit_id)='' then null else submit_id::text end submit_id
, case when trim(both from target_id)='' then null else target_id::text end target_id
, case when trim(both from target_name)='' then null else target_name::text end target_name
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'browser') browser
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
, (json_array_elements(data::json)::json->>'customerId') customer_id
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
, (json_array_elements(data::json)::json->>'date') date
, (json_array_elements(data::json)::json->>'event') event
, (json_array_elements(data::json)::json->>'externalId') external_id
, (json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'idStr') id_str
, (json_array_elements(data::json)::json->>'ip') ip
, (json_array_elements(data::json)::json->>'ipCity') ip_city
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
, (json_array_elements(data::json)::json->>'ipCounty') ip_county
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'os') os
, (json_array_elements(data::json)::json->>'osVersion') os_version
, (json_array_elements(data::json)::json->>'pageId') page_id
, (json_array_elements(data::json)::json->>'pageType') page_type
, (json_array_elements(data::json)::json->>'platform') platform
, (json_array_elements(data::json)::json->>'score') score
, (json_array_elements(data::json)::json->>'submitId') submit_id
, (json_array_elements(data::json)::json->>'targetId') target_id
, (json_array_elements(data::json)::json->>'targetName') target_name
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='c6e5b45f-35f5-442b-84be-35ff2e76' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='c6e5b45f-35f5-442b-84be-35ff2e76';
\q

View File

@ -0,0 +1,41 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
# 获取当前日期
current_date = datetime.date.today()
previous_date = current_date - datetime.timedelta(days=365)
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ")
print('开始加载数据custom_events_4:获取线索事件_subscribe')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
authRequest=requests.get(authUrl)
auth=json.loads(authRequest.text)
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customerEvents'
header={}
body={'access_token':auth['access_token'],'limit':'','event':'subscribe','sort':'date','date[ge]':formatted_previous_date,'date[le]':formatted_current_date,}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = '36308871-eea2-49b0-8268-03ab68e3';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'36308871-eea2-49b0-8268-03ab68e3',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束custom_events_4:获取线索事件_subscribe')

View File

@ -0,0 +1,60 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.customer_events_subscribe;
insert into data_api.customer_events_subscribe (
channel_account
, customer_id
, customer_id_str
, date
, event
, external_id
, id
, id_str
, last_updated
, score
, target_id
, target_name
,etl_tx_dt
)
select
case when trim(both from channel_account)='' then null else channel_account::text end channel_account
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
, case when trim(both from date)='' then null else date::text end date
, case when trim(both from event)='' then null else event::text end event
, case when trim(both from external_id)='' then null else external_id::text end external_id
, case when trim(both from id)='' then null else id::text end id
, case when trim(both from id_str)='' then null else id_str::text end id_str
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from score)='' then null else score::text end score
, case when trim(both from target_id)='' then null else target_id::text end target_id
, case when trim(both from target_name)='' then null else target_name::text end target_name
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'channelAccount') channel_account
, (json_array_elements(data::json)::json->>'customerId') customer_id
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
, (json_array_elements(data::json)::json->>'date') date
, (json_array_elements(data::json)::json->>'event') event
, (json_array_elements(data::json)::json->>'externalId') external_id
, (json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'idStr') id_str
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'score') score
, (json_array_elements(data::json)::json->>'targetId') target_id
, (json_array_elements(data::json)::json->>'targetName') target_name
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='36308871-eea2-49b0-8268-03ab68e3' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='36308871-eea2-49b0-8268-03ab68e3';
\q

View File

@ -0,0 +1,41 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
# 获取当前日期
current_date = datetime.date.today()
previous_date = current_date - datetime.timedelta(days=365)
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ")
print('开始加载数据custom_events_9:获取线索事件_wechat_scan')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
authRequest=requests.get(authUrl)
auth=json.loads(authRequest.text)
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/customerEvents'
header={}
body={'access_token':auth['access_token'],'limit':'2000','sort':'date','event':'wechat_scan','date[le]':formatted_current_date,'date[ge]':formatted_previous_date,}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = '14f55a0e-1c01-4bd5-9eec-7e0d0bdf';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'14f55a0e-1c01-4bd5-9eec-7e0d0bdf',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束custom_events_9:获取线索事件_wechat_scan')

View File

@ -0,0 +1,60 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.customer_events_wechat_scan;
insert into data_api.customer_events_wechat_scan (
channel_account
, customer_id
, customer_id_str
, date
, event
, external_id
, id
, id_str
, last_updated
, score
, target_id
, target_name
,etl_tx_dt
)
select
case when trim(both from channel_account)='' then null else channel_account::text end channel_account
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
, case when trim(both from date)='' then null else date::text end date
, case when trim(both from event)='' then null else event::text end event
, case when trim(both from external_id)='' then null else external_id::text end external_id
, case when trim(both from id)='' then null else id::text end id
, case when trim(both from id_str)='' then null else id_str::text end id_str
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
, case when trim(both from score)='' then null else score::text end score
, case when trim(both from target_id)='' then null else target_id::text end target_id
, case when trim(both from target_name)='' then null else target_name::text end target_name
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'channelAccount') channel_account
, (json_array_elements(data::json)::json->>'customerId') customer_id
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
, (json_array_elements(data::json)::json->>'date') date
, (json_array_elements(data::json)::json->>'event') event
, (json_array_elements(data::json)::json->>'externalId') external_id
, (json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'idStr') id_str
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
, (json_array_elements(data::json)::json->>'score') score
, (json_array_elements(data::json)::json->>'targetId') target_id
, (json_array_elements(data::json)::json->>'targetName') target_name
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='14f55a0e-1c01-4bd5-9eec-7e0d0bdf' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='14f55a0e-1c01-4bd5-9eec-7e0d0bdf';
\q

View File

@ -0,0 +1,41 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
# 获取当前日期
current_date = datetime.date.today()
previous_date = current_date - datetime.timedelta(days=2)
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ")
print('开始加载数据customer_event_meta:获取线索事件元数据')
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
print('开始请求令牌。')
authRequest=requests.get(authUrl)
auth=json.loads(authRequest.text)
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/metaService/getCustomerEventMeta'
header={}
body={'access_token':auth['access_token'],}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = '057169de-4aa7-4f73-8894-221b4fd0';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'057169de-4aa7-4f73-8894-221b4fd0',json_object,total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束customer_event_meta:获取线索事件元数据')

View File

@ -0,0 +1,33 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.customer_event_meta;
insert into data_api.customer_event_meta (
events
, group_label
, group_name
,etl_tx_dt
)
select
case when trim(both from events)='' then null else events::text end events
, case when trim(both from group_label)='' then null else group_label::text end group_label
, case when trim(both from group_name)='' then null else group_name::text end group_name
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'events') events
, (json_array_elements(data::json)::json->>'groupLabel') group_label
, (json_array_elements(data::json)::json->>'groupName') group_name
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='057169de-4aa7-4f73-8894-221b4fd0' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='057169de-4aa7-4f73-8894-221b4fd0';
\q