add workflow 荟聚API,dev
This commit is contained in:
parent
dfbb29e94c
commit
d2fdd139af
|
@ -0,0 +1,78 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
delete from p10_sa.S98_S_custom_events_activity_submit
|
||||
;
|
||||
insert into p10_sa.S98_S_custom_events_activity_submit
|
||||
( id
|
||||
, customer_id
|
||||
, event
|
||||
, source
|
||||
, target_name
|
||||
, last_updated
|
||||
, content_name
|
||||
, date
|
||||
, attr1
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
, page_id
|
||||
, target_id
|
||||
, etl_tx_dt )
|
||||
select
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, source
|
||||
, target_name
|
||||
, last_updated
|
||||
, content_name
|
||||
, date
|
||||
, attr1
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
, page_id
|
||||
, target_id
|
||||
, etl_tx_dt
|
||||
from p00_tal.S98_S_custom_events_activity_submit
|
||||
;
|
||||
delete from p12_sfull.S98_S_custom_events_activity_submit
|
||||
;
|
||||
;
|
||||
insert into p12_sfull.S98_S_custom_events_activity_submit
|
||||
( id
|
||||
, customer_id
|
||||
, event
|
||||
, source
|
||||
, target_name
|
||||
, last_updated
|
||||
, content_name
|
||||
, date
|
||||
, attr1
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
, page_id
|
||||
, target_id
|
||||
, etl_tx_dt )
|
||||
select
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, source
|
||||
, target_name
|
||||
, last_updated
|
||||
, content_name
|
||||
, date
|
||||
, attr1
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
, page_id
|
||||
, target_id
|
||||
, etl_tx_dt
|
||||
from p10_sa.S98_S_custom_events_activity_submit
|
||||
;
|
||||
\q
|
|
@ -0,0 +1,26 @@
|
|||
|
||||
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_custom_events_activity_submit (
|
||||
id TEXT
|
||||
, customer_id TEXT
|
||||
, event TEXT
|
||||
, source TEXT
|
||||
, target_name TEXT
|
||||
, last_updated TEXT
|
||||
, content_name TEXT
|
||||
, date TEXT
|
||||
, attr1 TEXT
|
||||
, customer_id_str TEXT
|
||||
, external_id TEXT
|
||||
, id_str TEXT
|
||||
, page_id TEXT
|
||||
, target_id TEXT
|
||||
, etl_tx_dt TIMESTAMP
|
||||
)
|
||||
|
||||
|
||||
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'custom_events_activity_submit' );
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,75 @@
|
|||
|
||||
create table if not exists p10_sa.S98_S_custom_events_activity_submit (
|
||||
id TEXT
|
||||
, customer_id TEXT
|
||||
, event TEXT
|
||||
, source TEXT
|
||||
, target_name TEXT
|
||||
, last_updated TEXT
|
||||
, content_name TEXT
|
||||
, date TEXT
|
||||
, attr1 TEXT
|
||||
, customer_id_str TEXT
|
||||
, external_id TEXT
|
||||
, id_str TEXT
|
||||
, page_id TEXT
|
||||
, target_id TEXT
|
||||
, etl_tx_dt TIMESTAMP
|
||||
) ;
|
||||
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.id IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.customer_id IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.event IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.source IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.target_name IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.last_updated IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.content_name IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.date IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.attr1 IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.customer_id_str IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.external_id IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.id_str IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.page_id IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.target_id IS '';
|
||||
COMMENT ON COLUMN p10_sa.S98_S_custom_events_activity_submit.etl_tx_dt IS '';
|
||||
|
||||
COMMENT ON TABLE p10_sa.S98_S_custom_events_activity_submit IS '';
|
||||
|
||||
|
||||
|
||||
create table if not exists p12_sfull.S98_S_custom_events_activity_submit (
|
||||
id TEXT
|
||||
, customer_id TEXT
|
||||
, event TEXT
|
||||
, source TEXT
|
||||
, target_name TEXT
|
||||
, last_updated TEXT
|
||||
, content_name TEXT
|
||||
, date TEXT
|
||||
, attr1 TEXT
|
||||
, customer_id_str TEXT
|
||||
, external_id TEXT
|
||||
, id_str TEXT
|
||||
, page_id TEXT
|
||||
, target_id TEXT
|
||||
, etl_tx_dt TIMESTAMP
|
||||
) ;
|
||||
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.id IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.customer_id IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.event IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.source IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.target_name IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.last_updated IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.content_name IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.date IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.attr1 IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.customer_id_str IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.external_id IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.id_str IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.page_id IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.target_id IS '';
|
||||
COMMENT ON COLUMN p12_sfull.S98_S_custom_events_activity_submit.etl_tx_dt IS '';
|
||||
|
||||
COMMENT ON TABLE p12_sfull.S98_S_custom_events_activity_submit IS '';
|
||||
|
|
@ -19,27 +19,7 @@ insert into data_api.scrm_contact_merge (
|
|||
, source
|
||||
, tag
|
||||
, c_keyword
|
||||
, attr2
|
||||
, screen_width
|
||||
, ip_city
|
||||
, ip_country
|
||||
, platform
|
||||
, page_type
|
||||
, os_version
|
||||
, browser
|
||||
, browser_version
|
||||
, id_str
|
||||
, os
|
||||
, target_id
|
||||
, customer_id_str
|
||||
, ip
|
||||
, screen_height
|
||||
, external_id
|
||||
, ip_county
|
||||
, page_id
|
||||
, url
|
||||
, ip_province
|
||||
, device
|
||||
, attr2
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
|
@ -56,64 +36,24 @@ select
|
|||
, case when trim(both from source)='' then null else source::text end source
|
||||
, case when trim(both from tag)='' then null else tag::text end tag
|
||||
, case when trim(both from c_keyword)='' then null else c_keyword::text end c_keyword
|
||||
, case when trim(both from attr2)='' then null else attr2::text end attr2
|
||||
, case when trim(both from screen_width)='' then null else screen_width::text end screen_width
|
||||
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
|
||||
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
|
||||
, case when trim(both from platform)='' then null else platform::text end platform
|
||||
, case when trim(both from page_type)='' then null else page_type::text end page_type
|
||||
, case when trim(both from os_version)='' then null else os_version::text end os_version
|
||||
, case when trim(both from browser)='' then null else browser::text end browser
|
||||
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from os)='' then null else os::text end os
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from ip)='' then null else ip::text end ip
|
||||
, case when trim(both from screen_height)='' then null else screen_height::text end screen_height
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from ip_county)='' then null else ip_county::text end ip_county
|
||||
, case when trim(both from page_id)='' then null else page_id::text end page_id
|
||||
, case when trim(both from url)='' then null else url::text end url
|
||||
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
|
||||
, case when trim(both from device)='' then null else device::text end device
|
||||
, case when trim(both from attr2)='' then null else attr2::text end attr2
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'date') mergedCustomerId
|
||||
(json_array_elements(data::json)::json->>'mergedCustomerId') mergedCustomerId
|
||||
, (json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'targetName') target_name
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'pageId') c_name
|
||||
, (json_array_elements(data::json)::json->>'date') c_type
|
||||
, (json_array_elements(data::json)::json->>'date') contentName
|
||||
, (json_array_elements(data::json)::json->>'date') source
|
||||
, (json_array_elements(data::json)::json->>'c_name') c_name
|
||||
, (json_array_elements(data::json)::json->>'c_type') c_type
|
||||
, (json_array_elements(data::json)::json->>'contentName') contentName
|
||||
, (json_array_elements(data::json)::json->>'source') source
|
||||
, (json_array_elements(data::json)::json->>'tag') tag
|
||||
, (json_array_elements(data::json)::json->>'date') c_keyword
|
||||
, (json_array_elements(data::json)::json->>'date') attr2
|
||||
, (json_array_elements(data::json)::json->>'screenWidth') screen_width
|
||||
, (json_array_elements(data::json)::json->>'ipCity') ip_city
|
||||
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
|
||||
, (json_array_elements(data::json)::json->>'platform') platform
|
||||
, (json_array_elements(data::json)::json->>'pageType') page_type
|
||||
, (json_array_elements(data::json)::json->>'osVersion') os_version
|
||||
, (json_array_elements(data::json)::json->>'browser') browser
|
||||
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'os') os
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'ip') ip
|
||||
, (json_array_elements(data::json)::json->>'screenHeight') screen_height
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'ipCounty') ip_county
|
||||
, (json_array_elements(data::json)::json->>'pageId') page_id
|
||||
, (json_array_elements(data::json)::json->>'url') url
|
||||
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
|
||||
, (json_array_elements(data::json)::json->>'device') device
|
||||
, (json_array_elements(data::json)::json->>'c_keyword') c_keyword
|
||||
, (json_array_elements(data::json)::json->>'attr2') attr2
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='ff5c7bf6-0d18-4201-9501-8fdd6152' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
|
|
@ -168,177 +168,6 @@ depends_on_past=False,
|
|||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_9_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_9_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_9_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_9_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_9_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_9_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_9_feign >> custom_events_9_load
|
||||
|
||||
custom_events_6_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_6_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_6_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_6_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_6_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_6_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_6_feign >> custom_events_6_load
|
||||
|
||||
custom_events_4_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_4_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_4_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_4_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_4_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_4_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_4_feign >> custom_events_4_load
|
||||
|
||||
custom_events_8_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_8_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_8_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_8_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_8_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_8_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_8_feign >> custom_events_8_load
|
||||
|
||||
custom_events_3_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_3_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_3_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_3_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_3_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_3_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_3_feign >> custom_events_3_load
|
||||
|
||||
custom_events_1_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_1_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_1_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_1_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_1_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_1_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_1_feign >> custom_events_1_load
|
||||
|
||||
custom_events_7_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_7_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_7_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_7_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_7_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_7_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_7_feign >> custom_events_7_load
|
||||
|
||||
custom_events_2_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_2_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_2_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_2_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_2_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_2_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_2_feign >> custom_events_2_load
|
||||
|
||||
custom_events_5_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_5_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_5_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_5_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_5_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_5_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_5_feign >> custom_events_5_load
|
||||
|
||||
customer_events_submit_form_4283 = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='customer_events_submit_form_4283',
|
||||
|
@ -429,18 +258,218 @@ depends_on_past=False,
|
|||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_activity_submit_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_activity_submit_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_activity_submit_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_activity_submit_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_activity_submit_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_activity_submit_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_activity_submit_feign >> custom_events_activity_submit_load
|
||||
|
||||
custom_events_open_page_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_open_page_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_open_page_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_open_page_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_open_page_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_open_page_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_open_page_feign >> custom_events_open_page_load
|
||||
|
||||
custom_events_open_content_page_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_open_content_page_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_open_content_page_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_open_content_page_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_open_content_page_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_open_content_page_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_open_content_page_feign >> custom_events_open_content_page_load
|
||||
|
||||
custom_events_click_link_in_page_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_click_link_in_page_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_click_link_in_page_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_click_link_in_page_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_click_link_in_page_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_click_link_in_page_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_click_link_in_page_feign >> custom_events_click_link_in_page_load
|
||||
|
||||
custom_events_subscribe_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_subscribe_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_subscribe_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_subscribe_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_subscribe_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_subscribe_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_subscribe_feign >> custom_events_subscribe_load
|
||||
|
||||
custom_events_wechat_scan_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_wechat_scan_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_wechat_scan_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_wechat_scan_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_wechat_scan_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_wechat_scan_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_wechat_scan_feign >> custom_events_wechat_scan_load
|
||||
|
||||
custom_events_submit_form_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_submit_form_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_submit_form_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_submit_form_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_submit_form_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_submit_form_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_submit_form_feign >> custom_events_submit_form_load
|
||||
|
||||
custom_events_open_app_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_open_app_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_open_app_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_open_app_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_open_app_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_open_app_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_open_app_feign >> custom_events_open_app_load
|
||||
|
||||
custom_events_c_minipro_page_view_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_c_minipro_page_view_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_c_minipro_page_view_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_c_minipro_page_view_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_c_minipro_page_view_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_c_minipro_page_view_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_c_minipro_page_view_feign >> custom_events_c_minipro_page_view_load
|
||||
|
||||
custom_events_cp_employee_tools_huiju_event_add_feign = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_cp_employee_tools_huiju_event_add_feign',
|
||||
command='python3 /data/airflow/etl/API/custom_events_cp_employee_tools_huiju_event_add_feign.py',
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_cp_employee_tools_huiju_event_add_load = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_cp_employee_tools_huiju_event_add_load',
|
||||
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||
params={'my_param':"custom_events_cp_employee_tools_huiju_event_add_load"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
custom_events_cp_employee_tools_huiju_event_add_feign >> custom_events_cp_employee_tools_huiju_event_add_load
|
||||
|
||||
custom_events_activity_submit_1455 = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='custom_events_activity_submit_1455',
|
||||
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
|
||||
params={'my_param':"S98_S_custom_events_activity_submit"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
scrm_contact_load >> scrm_contact_1129
|
||||
update_scrm_contact_load >> scrm_contact_lastupdated_4112
|
||||
customer_event_meta_load >> customer_event_meta_2268
|
||||
customer_events_load >> customer_events_3292
|
||||
custom_events_9_load >> customer_events_wechat_scan_2105
|
||||
custom_events_6_load >> customer_events_submit_form_4283
|
||||
custom_events_4_load >> customer_events_subscribe_5997
|
||||
custom_events_8_load >> customer_events_click_link_in_page_2034
|
||||
custom_events_1_load >> customer_events_open_app_5878
|
||||
custom_events_2_load >> customer_events_open_content_page_9684
|
||||
custom_events_3_load >> customer_events_open_page_7519
|
||||
custom_events_5_load >> customer_events_c_minipro_page_view_9309
|
||||
custom_events_7_load >> customer_events_add_user_9393
|
||||
custom_events_update_load >> scrm_contact_merge_6671
|
||||
scrm_contact_merge_6671 >> task_failed
|
||||
custom_events_open_page_load >> customer_events_open_page_7519
|
||||
custom_events_open_content_page_load >> customer_events_open_content_page_9684
|
||||
custom_events_click_link_in_page_load >> customer_events_click_link_in_page_2034
|
||||
custom_events_subscribe_load >> customer_events_subscribe_5997
|
||||
custom_events_wechat_scan_load >> customer_events_wechat_scan_2105
|
||||
custom_events_c_minipro_page_view_load >> customer_events_c_minipro_page_view_9309
|
||||
custom_events_submit_form_load >> customer_events_submit_form_4283
|
||||
custom_events_open_app_load >> customer_events_open_app_5878
|
||||
custom_events_cp_employee_tools_huiju_event_add_load >> customer_events_add_user_9393
|
||||
custom_events_activity_submit_load >> custom_events_activity_submit_1455
|
||||
custom_events_activity_submit_1455 >> task_failed
|
||||
|
|
|
@ -6,45 +6,45 @@
|
|||
DELETE FROM data_api.customer_events;
|
||||
|
||||
insert into data_api.customer_events (
|
||||
content_name
|
||||
, create_method
|
||||
id
|
||||
, customer_id
|
||||
, customer_id_str
|
||||
, date
|
||||
, event
|
||||
, external_id
|
||||
, id
|
||||
, id_str
|
||||
, last_updated
|
||||
, source
|
||||
, source
|
||||
, content_name
|
||||
, date
|
||||
, create_method
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from content_name)='' then null else content_name::text end content_name
|
||||
, case when trim(both from create_method)='' then null else create_method::text end create_method
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from source)='' then null else source::text end source
|
||||
, case when trim(both from source)='' then null else source::text end source
|
||||
, case when trim(both from content_name)='' then null else content_name::text end content_name
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from create_method)='' then null else create_method::text end create_method
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'contentName') content_name
|
||||
, (json_array_elements(data::json)::json->>'createMethod') create_method
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'source') source
|
||||
, (json_array_elements(data::json)::json->>'source') source
|
||||
, (json_array_elements(data::json)::json->>'contentName') content_name
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'createMethod') create_method
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='747d33fa-a0e0-421d-aa9b-4ca4517b' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
# coding: utf-8
|
||||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
import uuid
|
||||
import datetime
|
||||
import time
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
#全局变量,便于参数使用的预设值
|
||||
current_date = datetime.date.today() # 获取当前日期
|
||||
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||
sign_version = 'v2' # 签名版本号,固定值v2
|
||||
nonce = str(uuid.uuid4())
|
||||
|
||||
#获取签名令牌
|
||||
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||
# 按照指定的格式拼接字符串
|
||||
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||
# 使用SHA256算法计算哈希值
|
||||
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||
return sha256_hash
|
||||
|
||||
#获取鉴权token
|
||||
def get_token(url):
|
||||
#请求鉴权接口
|
||||
authRequest=requests.get(url)
|
||||
#解析结果
|
||||
if not authRequest: #若为空时,返回空
|
||||
return
|
||||
auth=json.loads(authRequest.text)
|
||||
return auth
|
||||
|
||||
print('开始加载数据:custom_events_activity_submit:获取线索事件_activity_submit')
|
||||
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||
|
||||
print('开始请求令牌。')
|
||||
#authRequest=requests.get(authUrl)
|
||||
#auth=json.loads(authRequest.text)
|
||||
auth = get_token(authUrl)
|
||||
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||
i = 0
|
||||
while not auth and i < 60:
|
||||
time.sleep(60)
|
||||
auth = get_token(authUrl)
|
||||
i = i + 1
|
||||
print('开始请求数据总数。')
|
||||
url='https://api.huiju.cool/v2/customerEvents'
|
||||
header={}
|
||||
body={'access_token':auth['access_token'],'limit':'2000','sort':'date','event':'activity_submit','date[le]':formatted_current_date,}
|
||||
dataReqL=requests.get(url,headers=header,params=body)
|
||||
resL=json.loads(dataReqL.text)
|
||||
# print(resL)
|
||||
dataList=resL['data']
|
||||
total=len(dataList)
|
||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||
host="172.17.0.8", port="5432")
|
||||
print('数据库连接成功')
|
||||
dataId=str(uuid.uuid4())
|
||||
print('临时id:'+dataId)
|
||||
json_object = json.dumps(dataList)
|
||||
cur=conn.cursor()
|
||||
sql="update data_api.api_data set is_loaded = '1' where api_id = '6229a6c5-c504-4723-85a6-f7471216';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||
cur.execute(sql,[dataId,'6229a6c5-c504-4723-85a6-f7471216',json_object,total])
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
print('加载数据结束:custom_events_activity_submit:获取线索事件_activity_submit')
|
|
@ -0,0 +1,66 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
DELETE FROM data_api.custom_events_activity_submit;
|
||||
|
||||
insert into data_api.custom_events_activity_submit (
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, source
|
||||
, target_name
|
||||
, last_updated
|
||||
, content_name
|
||||
, date
|
||||
, attr1
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
, page_id
|
||||
, target_id
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from source)='' then null else source::text end source
|
||||
, case when trim(both from target_name)='' then null else target_name::text end target_name
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from content_name)='' then null else content_name::text end content_name
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from attr1)='' then null else attr1::text end attr1
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from page_id)='' then null else page_id::text end page_id
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'source') source
|
||||
, (json_array_elements(data::json)::json->>'targetName') target_name
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'contentName') content_name
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'attr1') attr1
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'pageId') page_id
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='6229a6c5-c504-4723-85a6-f7471216' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
||||
update data_api.api_data
|
||||
set is_loaded = '1' ,
|
||||
status = '1',
|
||||
request_tm = current_timestamp(0)
|
||||
where api_id='6229a6c5-c504-4723-85a6-f7471216';
|
||||
\q
|
|
@ -0,0 +1,72 @@
|
|||
# coding: utf-8
|
||||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
import uuid
|
||||
import datetime
|
||||
import time
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
#全局变量,便于参数使用的预设值
|
||||
current_date = datetime.date.today() # 获取当前日期
|
||||
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||
sign_version = 'v2' # 签名版本号,固定值v2
|
||||
nonce = str(uuid.uuid4())
|
||||
|
||||
#获取签名令牌
|
||||
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||
# 按照指定的格式拼接字符串
|
||||
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||
# 使用SHA256算法计算哈希值
|
||||
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||
return sha256_hash
|
||||
|
||||
#获取鉴权token
|
||||
def get_token(url):
|
||||
#请求鉴权接口
|
||||
authRequest=requests.get(url)
|
||||
#解析结果
|
||||
if not authRequest: #若为空时,返回空
|
||||
return
|
||||
auth=json.loads(authRequest.text)
|
||||
return auth
|
||||
|
||||
print('开始加载数据:custom_events_c_minipro_page_view:获取线索事件_c_minipro_page_view')
|
||||
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||
|
||||
print('开始请求令牌。')
|
||||
#authRequest=requests.get(authUrl)
|
||||
#auth=json.loads(authRequest.text)
|
||||
auth = get_token(authUrl)
|
||||
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||
i = 0
|
||||
while not auth and i < 60:
|
||||
time.sleep(60)
|
||||
auth = get_token(authUrl)
|
||||
i = i + 1
|
||||
print('开始请求数据总数。')
|
||||
url='https://api.huiju.cool/v2/customerEvents'
|
||||
header={}
|
||||
body={'access_token':auth['access_token'],'limit':'2000','event':'c_minipro_page_view','sort':'date','date[ge]':formatted_previous_date,'date[le]':formatted_current_date,}
|
||||
dataReqL=requests.get(url,headers=header,params=body)
|
||||
resL=json.loads(dataReqL.text)
|
||||
# print(resL)
|
||||
dataList=resL['data']
|
||||
total=len(dataList)
|
||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||
host="172.17.0.8", port="5432")
|
||||
print('数据库连接成功')
|
||||
dataId=str(uuid.uuid4())
|
||||
print('临时id:'+dataId)
|
||||
json_object = json.dumps(dataList)
|
||||
cur=conn.cursor()
|
||||
sql="update data_api.api_data set is_loaded = '1' where api_id = 'a71a43ff-efcd-4427-8dbc-ed2686e4';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||
cur.execute(sql,[dataId,'a71a43ff-efcd-4427-8dbc-ed2686e4',json_object,total])
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
print('加载数据结束:custom_events_c_minipro_page_view:获取线索事件_c_minipro_page_view')
|
|
@ -0,0 +1,93 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
DELETE FROM data_api.customer_events_c_minipro_page_view;
|
||||
|
||||
insert into data_api.customer_events_c_minipro_page_view (
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, c_name
|
||||
, last_updated
|
||||
, date
|
||||
, browser
|
||||
, browser_version
|
||||
, c_source_content
|
||||
, c_url
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
, ip
|
||||
, ip_city
|
||||
, ip_country
|
||||
, ip_province
|
||||
, os
|
||||
, os_version
|
||||
, platform
|
||||
, target_id
|
||||
, target_name
|
||||
, url
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from c_name)='' then null else c_name::text end c_name
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from browser)='' then null else browser::text end browser
|
||||
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
|
||||
, case when trim(both from c_source_content)='' then null else c_source_content::text end c_source_content
|
||||
, case when trim(both from c_url)='' then null else c_url::text end c_url
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from ip)='' then null else ip::text end ip
|
||||
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
|
||||
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
|
||||
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
|
||||
, case when trim(both from os)='' then null else os::text end os
|
||||
, case when trim(both from os_version)='' then null else os_version::text end os_version
|
||||
, case when trim(both from platform)='' then null else platform::text end platform
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
, case when trim(both from target_name)='' then null else target_name::text end target_name
|
||||
, case when trim(both from url)='' then null else url::text end url
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'c_name') c_name
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'browser') browser
|
||||
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
|
||||
, (json_array_elements(data::json)::json->>'c_sourceContent') c_source_content
|
||||
, (json_array_elements(data::json)::json->>'c_url') c_url
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'ip') ip
|
||||
, (json_array_elements(data::json)::json->>'ipCity') ip_city
|
||||
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
|
||||
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
|
||||
, (json_array_elements(data::json)::json->>'os') os
|
||||
, (json_array_elements(data::json)::json->>'osVersion') os_version
|
||||
, (json_array_elements(data::json)::json->>'platform') platform
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
, (json_array_elements(data::json)::json->>'targetName') target_name
|
||||
, (json_array_elements(data::json)::json->>'url') url
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='a71a43ff-efcd-4427-8dbc-ed2686e4' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
||||
update data_api.api_data
|
||||
set is_loaded = '1' ,
|
||||
status = '1',
|
||||
request_tm = current_timestamp(0)
|
||||
where api_id='a71a43ff-efcd-4427-8dbc-ed2686e4';
|
||||
\q
|
|
@ -0,0 +1,72 @@
|
|||
# coding: utf-8
|
||||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
import uuid
|
||||
import datetime
|
||||
import time
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
#全局变量,便于参数使用的预设值
|
||||
current_date = datetime.date.today() # 获取当前日期
|
||||
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||
sign_version = 'v2' # 签名版本号,固定值v2
|
||||
nonce = str(uuid.uuid4())
|
||||
|
||||
#获取签名令牌
|
||||
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||
# 按照指定的格式拼接字符串
|
||||
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||
# 使用SHA256算法计算哈希值
|
||||
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||
return sha256_hash
|
||||
|
||||
#获取鉴权token
|
||||
def get_token(url):
|
||||
#请求鉴权接口
|
||||
authRequest=requests.get(url)
|
||||
#解析结果
|
||||
if not authRequest: #若为空时,返回空
|
||||
return
|
||||
auth=json.loads(authRequest.text)
|
||||
return auth
|
||||
|
||||
print('开始加载数据:custom_events_click_link_in_page:获取线索事件_click_link_in_page')
|
||||
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||
|
||||
print('开始请求令牌。')
|
||||
#authRequest=requests.get(authUrl)
|
||||
#auth=json.loads(authRequest.text)
|
||||
auth = get_token(authUrl)
|
||||
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||
i = 0
|
||||
while not auth and i < 60:
|
||||
time.sleep(60)
|
||||
auth = get_token(authUrl)
|
||||
i = i + 1
|
||||
print('开始请求数据总数。')
|
||||
url='https://api.huiju.cool/v2/customerEvents'
|
||||
header={}
|
||||
body={'sort':'date','date[le]':formatted_current_date,'limit':'2000','event':'click_link_in_page','access_token':auth['access_token'],'date[ge]':formatted_previous_date,}
|
||||
dataReqL=requests.get(url,headers=header,params=body)
|
||||
resL=json.loads(dataReqL.text)
|
||||
# print(resL)
|
||||
dataList=resL['data']
|
||||
total=len(dataList)
|
||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||
host="172.17.0.8", port="5432")
|
||||
print('数据库连接成功')
|
||||
dataId=str(uuid.uuid4())
|
||||
print('临时id:'+dataId)
|
||||
json_object = json.dumps(dataList)
|
||||
cur=conn.cursor()
|
||||
sql="update data_api.api_data set is_loaded = '1' where api_id = 'b20bb19a-6ace-4d10-ab2b-4cdb3c0a';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||
cur.execute(sql,[dataId,'b20bb19a-6ace-4d10-ab2b-4cdb3c0a',json_object,total])
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
print('加载数据结束:custom_events_click_link_in_page:获取线索事件_click_link_in_page')
|
|
@ -0,0 +1,102 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
DELETE FROM data_api.customer_events_click_link_in_page;
|
||||
|
||||
insert into data_api.customer_events_click_link_in_page (
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, last_updated
|
||||
, date
|
||||
, browser
|
||||
, browser_version
|
||||
, customer_id_str
|
||||
, device
|
||||
, external_id
|
||||
, id_str
|
||||
, ip
|
||||
, ip_city
|
||||
, ip_country
|
||||
, ip_province
|
||||
, os
|
||||
, os_version
|
||||
, page_id
|
||||
, page_type
|
||||
, platform
|
||||
, score
|
||||
, screen_height
|
||||
, screen_width
|
||||
, target_id
|
||||
, target_name
|
||||
, url
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from browser)='' then null else browser::text end browser
|
||||
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from device)='' then null else device::text end device
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from ip)='' then null else ip::text end ip
|
||||
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
|
||||
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
|
||||
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
|
||||
, case when trim(both from os)='' then null else os::text end os
|
||||
, case when trim(both from os_version)='' then null else os_version::text end os_version
|
||||
, case when trim(both from page_id)='' then null else page_id::text end page_id
|
||||
, case when trim(both from page_type)='' then null else page_type::text end page_type
|
||||
, case when trim(both from platform)='' then null else platform::text end platform
|
||||
, case when trim(both from score)='' then null else score::text end score
|
||||
, case when trim(both from screen_height)='' then null else screen_height::text end screen_height
|
||||
, case when trim(both from screen_width)='' then null else screen_width::text end screen_width
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
, case when trim(both from target_name)='' then null else target_name::text end target_name
|
||||
, case when trim(both from url)='' then null else url::text end url
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'browser') browser
|
||||
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'device') device
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'ip') ip
|
||||
, (json_array_elements(data::json)::json->>'ipCity') ip_city
|
||||
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
|
||||
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
|
||||
, (json_array_elements(data::json)::json->>'os') os
|
||||
, (json_array_elements(data::json)::json->>'osVersion') os_version
|
||||
, (json_array_elements(data::json)::json->>'pageId') page_id
|
||||
, (json_array_elements(data::json)::json->>'pageType') page_type
|
||||
, (json_array_elements(data::json)::json->>'platform') platform
|
||||
, (json_array_elements(data::json)::json->>'score') score
|
||||
, (json_array_elements(data::json)::json->>'screenHeight') screen_height
|
||||
, (json_array_elements(data::json)::json->>'screenWidth') screen_width
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
, (json_array_elements(data::json)::json->>'targetName') target_name
|
||||
, (json_array_elements(data::json)::json->>'url') url
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='b20bb19a-6ace-4d10-ab2b-4cdb3c0a' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
||||
update data_api.api_data
|
||||
set is_loaded = '1' ,
|
||||
status = '1',
|
||||
request_tm = current_timestamp(0)
|
||||
where api_id='b20bb19a-6ace-4d10-ab2b-4cdb3c0a';
|
||||
\q
|
|
@ -0,0 +1,72 @@
|
|||
# coding: utf-8
|
||||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
import uuid
|
||||
import datetime
|
||||
import time
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
#全局变量,便于参数使用的预设值
|
||||
current_date = datetime.date.today() # 获取当前日期
|
||||
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||
sign_version = 'v2' # 签名版本号,固定值v2
|
||||
nonce = str(uuid.uuid4())
|
||||
|
||||
#获取签名令牌
|
||||
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||
# 按照指定的格式拼接字符串
|
||||
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||
# 使用SHA256算法计算哈希值
|
||||
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||
return sha256_hash
|
||||
|
||||
#获取鉴权token
|
||||
def get_token(url):
|
||||
#请求鉴权接口
|
||||
authRequest=requests.get(url)
|
||||
#解析结果
|
||||
if not authRequest: #若为空时,返回空
|
||||
return
|
||||
auth=json.loads(authRequest.text)
|
||||
return auth
|
||||
|
||||
print('开始加载数据:custom_events_cp_employee_tools_huiju_event_add:获取线索事件_cp_employee_tools_huiju_event_add')
|
||||
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||
|
||||
print('开始请求令牌。')
|
||||
#authRequest=requests.get(authUrl)
|
||||
#auth=json.loads(authRequest.text)
|
||||
auth = get_token(authUrl)
|
||||
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||
i = 0
|
||||
while not auth and i < 60:
|
||||
time.sleep(60)
|
||||
auth = get_token(authUrl)
|
||||
i = i + 1
|
||||
print('开始请求数据总数。')
|
||||
url='https://api.huiju.cool/v2/customerEvents'
|
||||
header={}
|
||||
body={'access_token':auth['access_token'],'sort':'date','limit':'2000','event':'cp_employee_tools_huiju_event_add_user','date[le]':formatted_current_date,'date[ge]':formatted_previous_date,}
|
||||
dataReqL=requests.get(url,headers=header,params=body)
|
||||
resL=json.loads(dataReqL.text)
|
||||
# print(resL)
|
||||
dataList=resL['data']
|
||||
total=len(dataList)
|
||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||
host="172.17.0.8", port="5432")
|
||||
print('数据库连接成功')
|
||||
dataId=str(uuid.uuid4())
|
||||
print('临时id:'+dataId)
|
||||
json_object = json.dumps(dataList)
|
||||
cur=conn.cursor()
|
||||
sql="update data_api.api_data set is_loaded = '1' where api_id = 'd3b5116a-08c3-431f-8d96-9780221b';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||
cur.execute(sql,[dataId,'d3b5116a-08c3-431f-8d96-9780221b',json_object,total])
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
print('加载数据结束:custom_events_cp_employee_tools_huiju_event_add:获取线索事件_cp_employee_tools_huiju_event_add')
|
|
@ -0,0 +1,72 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
DELETE FROM data_api.customer_events_add_user;
|
||||
|
||||
insert into data_api.customer_events_add_user (
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, source
|
||||
, last_updated
|
||||
, date
|
||||
, attr2
|
||||
, attr1
|
||||
, attr5
|
||||
, attr6
|
||||
, attr7
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
, summary
|
||||
, target_id
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from source)='' then null else source::text end source
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from attr2)='' then null else attr2::text end attr2
|
||||
, case when trim(both from attr1)='' then null else attr1::text end attr1
|
||||
, case when trim(both from attr5)='' then null else attr5::text end attr5
|
||||
, case when trim(both from attr6)='' then null else attr6::text end attr6
|
||||
, case when trim(both from attr7)='' then null else attr7::text end attr7
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from summary)='' then null else summary::text end summary
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'source') source
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'attr2') attr2
|
||||
, (json_array_elements(data::json)::json->>'attr1') attr1
|
||||
, (json_array_elements(data::json)::json->>'attr5') attr5
|
||||
, (json_array_elements(data::json)::json->>'attr6') attr6
|
||||
, (json_array_elements(data::json)::json->>'attr7') attr7
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'summary') summary
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='d3b5116a-08c3-431f-8d96-9780221b' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
||||
update data_api.api_data
|
||||
set is_loaded = '1' ,
|
||||
status = '1',
|
||||
request_tm = current_timestamp(0)
|
||||
where api_id='d3b5116a-08c3-431f-8d96-9780221b';
|
||||
\q
|
|
@ -0,0 +1,72 @@
|
|||
# coding: utf-8
|
||||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
import uuid
|
||||
import datetime
|
||||
import time
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
#全局变量,便于参数使用的预设值
|
||||
current_date = datetime.date.today() # 获取当前日期
|
||||
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||
sign_version = 'v2' # 签名版本号,固定值v2
|
||||
nonce = str(uuid.uuid4())
|
||||
|
||||
#获取签名令牌
|
||||
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||
# 按照指定的格式拼接字符串
|
||||
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||
# 使用SHA256算法计算哈希值
|
||||
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||
return sha256_hash
|
||||
|
||||
#获取鉴权token
|
||||
def get_token(url):
|
||||
#请求鉴权接口
|
||||
authRequest=requests.get(url)
|
||||
#解析结果
|
||||
if not authRequest: #若为空时,返回空
|
||||
return
|
||||
auth=json.loads(authRequest.text)
|
||||
return auth
|
||||
|
||||
print('开始加载数据:custom_events_open_app:获取线索事件_open_app')
|
||||
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||
|
||||
print('开始请求令牌。')
|
||||
#authRequest=requests.get(authUrl)
|
||||
#auth=json.loads(authRequest.text)
|
||||
auth = get_token(authUrl)
|
||||
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||
i = 0
|
||||
while not auth and i < 60:
|
||||
time.sleep(60)
|
||||
auth = get_token(authUrl)
|
||||
i = i + 1
|
||||
print('开始请求数据总数。')
|
||||
url='https://api.huiju.cool/v2/customerEvents'
|
||||
header={}
|
||||
body={'access_token':auth['access_token'],'sort':'date','limit':'2000','date[ge]':formatted_previous_date,'date[le]':formatted_current_date,'event':'open_app',}
|
||||
dataReqL=requests.get(url,headers=header,params=body)
|
||||
resL=json.loads(dataReqL.text)
|
||||
# print(resL)
|
||||
dataList=resL['data']
|
||||
total=len(dataList)
|
||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||
host="172.17.0.8", port="5432")
|
||||
print('数据库连接成功')
|
||||
dataId=str(uuid.uuid4())
|
||||
print('临时id:'+dataId)
|
||||
json_object = json.dumps(dataList)
|
||||
cur=conn.cursor()
|
||||
sql="update data_api.api_data set is_loaded = '1' where api_id = '2433868c-3416-4475-884d-11e687a6';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||
cur.execute(sql,[dataId,'2433868c-3416-4475-884d-11e687a6',json_object,total])
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
print('加载数据结束:custom_events_open_app:获取线索事件_open_app')
|
|
@ -0,0 +1,99 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
DELETE FROM data_api.customer_events_open_app;
|
||||
|
||||
insert into data_api.customer_events_open_app (
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, last_updated
|
||||
, date
|
||||
, browser
|
||||
, browser_version
|
||||
, customer_id_str
|
||||
, device
|
||||
, external_id
|
||||
, id_str
|
||||
, ip
|
||||
, ip_city
|
||||
, ip_country
|
||||
, ip_province
|
||||
, os
|
||||
, os_version
|
||||
, page_id
|
||||
, page_type
|
||||
, platform
|
||||
, screen_height
|
||||
, screen_width
|
||||
, target_id
|
||||
, target_name
|
||||
, url
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from browser)='' then null else browser::text end browser
|
||||
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from device)='' then null else device::text end device
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from ip)='' then null else ip::text end ip
|
||||
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
|
||||
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
|
||||
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
|
||||
, case when trim(both from os)='' then null else os::text end os
|
||||
, case when trim(both from os_version)='' then null else os_version::text end os_version
|
||||
, case when trim(both from page_id)='' then null else page_id::text end page_id
|
||||
, case when trim(both from page_type)='' then null else page_type::text end page_type
|
||||
, case when trim(both from platform)='' then null else platform::text end platform
|
||||
, case when trim(both from screen_height)='' then null else screen_height::text end screen_height
|
||||
, case when trim(both from screen_width)='' then null else screen_width::text end screen_width
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
, case when trim(both from target_name)='' then null else target_name::text end target_name
|
||||
, case when trim(both from url)='' then null else url::text end url
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'browser') browser
|
||||
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'device') device
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'ip') ip
|
||||
, (json_array_elements(data::json)::json->>'ipCity') ip_city
|
||||
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
|
||||
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
|
||||
, (json_array_elements(data::json)::json->>'os') os
|
||||
, (json_array_elements(data::json)::json->>'osVersion') os_version
|
||||
, (json_array_elements(data::json)::json->>'pageId') page_id
|
||||
, (json_array_elements(data::json)::json->>'pageType') page_type
|
||||
, (json_array_elements(data::json)::json->>'platform') platform
|
||||
, (json_array_elements(data::json)::json->>'screenHeight') screen_height
|
||||
, (json_array_elements(data::json)::json->>'screenWidth') screen_width
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
, (json_array_elements(data::json)::json->>'targetName') target_name
|
||||
, (json_array_elements(data::json)::json->>'url') url
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='2433868c-3416-4475-884d-11e687a6' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
||||
update data_api.api_data
|
||||
set is_loaded = '1' ,
|
||||
status = '1',
|
||||
request_tm = current_timestamp(0)
|
||||
where api_id='2433868c-3416-4475-884d-11e687a6';
|
||||
\q
|
|
@ -0,0 +1,72 @@
|
|||
# coding: utf-8
|
||||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
import uuid
|
||||
import datetime
|
||||
import time
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
#全局变量,便于参数使用的预设值
|
||||
current_date = datetime.date.today() # 获取当前日期
|
||||
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||
sign_version = 'v2' # 签名版本号,固定值v2
|
||||
nonce = str(uuid.uuid4())
|
||||
|
||||
#获取签名令牌
|
||||
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||
# 按照指定的格式拼接字符串
|
||||
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||
# 使用SHA256算法计算哈希值
|
||||
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||
return sha256_hash
|
||||
|
||||
#获取鉴权token
|
||||
def get_token(url):
|
||||
#请求鉴权接口
|
||||
authRequest=requests.get(url)
|
||||
#解析结果
|
||||
if not authRequest: #若为空时,返回空
|
||||
return
|
||||
auth=json.loads(authRequest.text)
|
||||
return auth
|
||||
|
||||
print('开始加载数据:custom_events_open_content_page:获取线索事件_open_content_page')
|
||||
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||
|
||||
print('开始请求令牌。')
|
||||
#authRequest=requests.get(authUrl)
|
||||
#auth=json.loads(authRequest.text)
|
||||
auth = get_token(authUrl)
|
||||
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||
i = 0
|
||||
while not auth and i < 60:
|
||||
time.sleep(60)
|
||||
auth = get_token(authUrl)
|
||||
i = i + 1
|
||||
print('开始请求数据总数。')
|
||||
url='https://api.huiju.cool/v2/customerEvents'
|
||||
header={}
|
||||
body={'access_token':auth['access_token'],'limit':'2000','event':'open_content_page','sort':'date','date[ge]':formatted_previous_date,'date[le]':formatted_current_date,}
|
||||
dataReqL=requests.get(url,headers=header,params=body)
|
||||
resL=json.loads(dataReqL.text)
|
||||
# print(resL)
|
||||
dataList=resL['data']
|
||||
total=len(dataList)
|
||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||
host="172.17.0.8", port="5432")
|
||||
print('数据库连接成功')
|
||||
dataId=str(uuid.uuid4())
|
||||
print('临时id:'+dataId)
|
||||
json_object = json.dumps(dataList)
|
||||
cur=conn.cursor()
|
||||
sql="update data_api.api_data set is_loaded = '1' where api_id = '43edfb13-8a42-4152-9dc8-d5feb3c8';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||
cur.execute(sql,[dataId,'43edfb13-8a42-4152-9dc8-d5feb3c8',json_object,total])
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
print('加载数据结束:custom_events_open_content_page:获取线索事件_open_content_page')
|
|
@ -0,0 +1,102 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
DELETE FROM data_api.customer_events_open_content_page;
|
||||
|
||||
insert into data_api.customer_events_open_content_page (
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, last_updated
|
||||
, date
|
||||
, browser
|
||||
, browser_version
|
||||
, customer_id_str
|
||||
, device
|
||||
, external_id
|
||||
, id_str
|
||||
, ip
|
||||
, ip_city
|
||||
, ip_country
|
||||
, ip_province
|
||||
, os
|
||||
, os_version
|
||||
, page_id
|
||||
, page_type
|
||||
, platform
|
||||
, screen_height
|
||||
, screen_width
|
||||
, short_id
|
||||
, target_id
|
||||
, target_name
|
||||
, url
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from browser)='' then null else browser::text end browser
|
||||
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from device)='' then null else device::text end device
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from ip)='' then null else ip::text end ip
|
||||
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
|
||||
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
|
||||
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
|
||||
, case when trim(both from os)='' then null else os::text end os
|
||||
, case when trim(both from os_version)='' then null else os_version::text end os_version
|
||||
, case when trim(both from page_id)='' then null else page_id::text end page_id
|
||||
, case when trim(both from page_type)='' then null else page_type::text end page_type
|
||||
, case when trim(both from platform)='' then null else platform::text end platform
|
||||
, case when trim(both from screen_height)='' then null else screen_height::text end screen_height
|
||||
, case when trim(both from screen_width)='' then null else screen_width::text end screen_width
|
||||
, case when trim(both from short_id)='' then null else short_id::text end short_id
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
, case when trim(both from target_name)='' then null else target_name::text end target_name
|
||||
, case when trim(both from url)='' then null else url::text end url
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'browser') browser
|
||||
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'device') device
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'ip') ip
|
||||
, (json_array_elements(data::json)::json->>'ipCity') ip_city
|
||||
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
|
||||
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
|
||||
, (json_array_elements(data::json)::json->>'os') os
|
||||
, (json_array_elements(data::json)::json->>'osVersion') os_version
|
||||
, (json_array_elements(data::json)::json->>'pageId') page_id
|
||||
, (json_array_elements(data::json)::json->>'pageType') page_type
|
||||
, (json_array_elements(data::json)::json->>'platform') platform
|
||||
, (json_array_elements(data::json)::json->>'screenHeight') screen_height
|
||||
, (json_array_elements(data::json)::json->>'screenWidth') screen_width
|
||||
, (json_array_elements(data::json)::json->>'shortId') short_id
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
, (json_array_elements(data::json)::json->>'targetName') target_name
|
||||
, (json_array_elements(data::json)::json->>'url') url
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='43edfb13-8a42-4152-9dc8-d5feb3c8' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
||||
update data_api.api_data
|
||||
set is_loaded = '1' ,
|
||||
status = '1',
|
||||
request_tm = current_timestamp(0)
|
||||
where api_id='43edfb13-8a42-4152-9dc8-d5feb3c8';
|
||||
\q
|
|
@ -0,0 +1,72 @@
|
|||
# coding: utf-8
|
||||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
import uuid
|
||||
import datetime
|
||||
import time
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
#全局变量,便于参数使用的预设值
|
||||
current_date = datetime.date.today() # 获取当前日期
|
||||
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||
sign_version = 'v2' # 签名版本号,固定值v2
|
||||
nonce = str(uuid.uuid4())
|
||||
|
||||
#获取签名令牌
|
||||
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||
# 按照指定的格式拼接字符串
|
||||
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||
# 使用SHA256算法计算哈希值
|
||||
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||
return sha256_hash
|
||||
|
||||
#获取鉴权token
|
||||
def get_token(url):
|
||||
#请求鉴权接口
|
||||
authRequest=requests.get(url)
|
||||
#解析结果
|
||||
if not authRequest: #若为空时,返回空
|
||||
return
|
||||
auth=json.loads(authRequest.text)
|
||||
return auth
|
||||
|
||||
print('开始加载数据:custom_events_open_page:获取线索事件_open_page')
|
||||
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||
|
||||
print('开始请求令牌。')
|
||||
#authRequest=requests.get(authUrl)
|
||||
#auth=json.loads(authRequest.text)
|
||||
auth = get_token(authUrl)
|
||||
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||
i = 0
|
||||
while not auth and i < 60:
|
||||
time.sleep(60)
|
||||
auth = get_token(authUrl)
|
||||
i = i + 1
|
||||
print('开始请求数据总数。')
|
||||
url='https://api.huiju.cool/v2/customerEvents'
|
||||
header={}
|
||||
body={'access_token':auth['access_token'],'limit':'2000','sort':'date','event':'open_page','date[le]':formatted_current_date,'date[ge]':formatted_previous_date,}
|
||||
dataReqL=requests.get(url,headers=header,params=body)
|
||||
resL=json.loads(dataReqL.text)
|
||||
# print(resL)
|
||||
dataList=resL['data']
|
||||
total=len(dataList)
|
||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||
host="172.17.0.8", port="5432")
|
||||
print('数据库连接成功')
|
||||
dataId=str(uuid.uuid4())
|
||||
print('临时id:'+dataId)
|
||||
json_object = json.dumps(dataList)
|
||||
cur=conn.cursor()
|
||||
sql="update data_api.api_data set is_loaded = '1' where api_id = 'a2b284c3-322f-4bc0-89ff-414aa66a';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||
cur.execute(sql,[dataId,'a2b284c3-322f-4bc0-89ff-414aa66a',json_object,total])
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
print('加载数据结束:custom_events_open_page:获取线索事件_open_page')
|
|
@ -0,0 +1,123 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
DELETE FROM data_api.customer_events_open_page;
|
||||
|
||||
insert into data_api.customer_events_open_page (
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, last_updated
|
||||
, date
|
||||
, browser
|
||||
, browser_version
|
||||
, campaign
|
||||
, customer_id_str
|
||||
, device
|
||||
, domain
|
||||
, external_id
|
||||
, id_str
|
||||
, ip
|
||||
, ip_city
|
||||
, ip_country
|
||||
, ip_county
|
||||
, ip_province
|
||||
, target_name
|
||||
, os
|
||||
, os_version
|
||||
, page_id
|
||||
, page_type
|
||||
, path
|
||||
, platform
|
||||
, ref_domain
|
||||
, root_domain
|
||||
, score
|
||||
, screen_height
|
||||
, screen_width
|
||||
, short_id
|
||||
, target_id
|
||||
, url
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from browser)='' then null else browser::text end browser
|
||||
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
|
||||
, case when trim(both from campaign)='' then null else campaign::text end campaign
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from device)='' then null else device::text end device
|
||||
, case when trim(both from domain)='' then null else domain::text end domain
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from ip)='' then null else ip::text end ip
|
||||
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
|
||||
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
|
||||
, case when trim(both from ip_county)='' then null else ip_county::text end ip_county
|
||||
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
|
||||
, case when trim(both from target_name)='' then null else target_name::text end target_name
|
||||
, case when trim(both from os)='' then null else os::text end os
|
||||
, case when trim(both from os_version)='' then null else os_version::text end os_version
|
||||
, case when trim(both from page_id)='' then null else page_id::text end page_id
|
||||
, case when trim(both from page_type)='' then null else page_type::text end page_type
|
||||
, case when trim(both from path)='' then null else path::text end path
|
||||
, case when trim(both from platform)='' then null else platform::text end platform
|
||||
, case when trim(both from ref_domain)='' then null else ref_domain::text end ref_domain
|
||||
, case when trim(both from root_domain)='' then null else root_domain::text end root_domain
|
||||
, case when trim(both from score)='' then null else score::text end score
|
||||
, case when trim(both from screen_height)='' then null else screen_height::text end screen_height
|
||||
, case when trim(both from screen_width)='' then null else screen_width::text end screen_width
|
||||
, case when trim(both from short_id)='' then null else short_id::text end short_id
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
, case when trim(both from url)='' then null else url::text end url
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'browser') browser
|
||||
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
|
||||
, (json_array_elements(data::json)::json->>'campaign') campaign
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'device') device
|
||||
, (json_array_elements(data::json)::json->>'domain') domain
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'ip') ip
|
||||
, (json_array_elements(data::json)::json->>'ipCity') ip_city
|
||||
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
|
||||
, (json_array_elements(data::json)::json->>'ipCounty') ip_county
|
||||
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
|
||||
, (json_array_elements(data::json)::json->>'targetName') target_name
|
||||
, (json_array_elements(data::json)::json->>'os') os
|
||||
, (json_array_elements(data::json)::json->>'osVersion') os_version
|
||||
, (json_array_elements(data::json)::json->>'pageId') page_id
|
||||
, (json_array_elements(data::json)::json->>'pageType') page_type
|
||||
, (json_array_elements(data::json)::json->>'path') path
|
||||
, (json_array_elements(data::json)::json->>'platform') platform
|
||||
, (json_array_elements(data::json)::json->>'refDomain') ref_domain
|
||||
, (json_array_elements(data::json)::json->>'rootDomain') root_domain
|
||||
, (json_array_elements(data::json)::json->>'score') score
|
||||
, (json_array_elements(data::json)::json->>'screenHeight') screen_height
|
||||
, (json_array_elements(data::json)::json->>'screenWidth') screen_width
|
||||
, (json_array_elements(data::json)::json->>'shortId') short_id
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
, (json_array_elements(data::json)::json->>'url') url
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='a2b284c3-322f-4bc0-89ff-414aa66a' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
||||
update data_api.api_data
|
||||
set is_loaded = '1' ,
|
||||
status = '1',
|
||||
request_tm = current_timestamp(0)
|
||||
where api_id='a2b284c3-322f-4bc0-89ff-414aa66a';
|
||||
\q
|
|
@ -0,0 +1,72 @@
|
|||
# coding: utf-8
|
||||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
import uuid
|
||||
import datetime
|
||||
import time
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
#全局变量,便于参数使用的预设值
|
||||
current_date = datetime.date.today() # 获取当前日期
|
||||
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||
sign_version = 'v2' # 签名版本号,固定值v2
|
||||
nonce = str(uuid.uuid4())
|
||||
|
||||
#获取签名令牌
|
||||
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||
# 按照指定的格式拼接字符串
|
||||
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||
# 使用SHA256算法计算哈希值
|
||||
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||
return sha256_hash
|
||||
|
||||
#获取鉴权token
|
||||
def get_token(url):
|
||||
#请求鉴权接口
|
||||
authRequest=requests.get(url)
|
||||
#解析结果
|
||||
if not authRequest: #若为空时,返回空
|
||||
return
|
||||
auth=json.loads(authRequest.text)
|
||||
return auth
|
||||
|
||||
print('开始加载数据:custom_events_submit_form:获取线索事件_submit_form')
|
||||
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||
|
||||
print('开始请求令牌。')
|
||||
#authRequest=requests.get(authUrl)
|
||||
#auth=json.loads(authRequest.text)
|
||||
auth = get_token(authUrl)
|
||||
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||
i = 0
|
||||
while not auth and i < 60:
|
||||
time.sleep(60)
|
||||
auth = get_token(authUrl)
|
||||
i = i + 1
|
||||
print('开始请求数据总数。')
|
||||
url='https://api.huiju.cool/v2/customerEvents'
|
||||
header={}
|
||||
body={'access_token':auth['access_token'],'event':'submit_form','limit':'2000','sort':'date','date[ge]':formatted_previous_date,'date[le]':formatted_current_date,}
|
||||
dataReqL=requests.get(url,headers=header,params=body)
|
||||
resL=json.loads(dataReqL.text)
|
||||
# print(resL)
|
||||
dataList=resL['data']
|
||||
total=len(dataList)
|
||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||
host="172.17.0.8", port="5432")
|
||||
print('数据库连接成功')
|
||||
dataId=str(uuid.uuid4())
|
||||
print('临时id:'+dataId)
|
||||
json_object = json.dumps(dataList)
|
||||
cur=conn.cursor()
|
||||
sql="update data_api.api_data set is_loaded = '1' where api_id = 'c6e5b45f-35f5-442b-84be-35ff2e76';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||
cur.execute(sql,[dataId,'c6e5b45f-35f5-442b-84be-35ff2e76',json_object,total])
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
print('加载数据结束:custom_events_submit_form:获取线索事件_submit_form')
|
|
@ -0,0 +1,96 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
DELETE FROM data_api.customer_events_submit_form;
|
||||
|
||||
insert into data_api.customer_events_submit_form (
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, last_updated
|
||||
, date
|
||||
, browser
|
||||
, browser_version
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
, ip
|
||||
, ip_city
|
||||
, ip_country
|
||||
, ip_county
|
||||
, ip_province
|
||||
, os
|
||||
, os_version
|
||||
, page_id
|
||||
, page_type
|
||||
, platform
|
||||
, score
|
||||
, submit_id
|
||||
, target_id
|
||||
, target_name
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from browser)='' then null else browser::text end browser
|
||||
, case when trim(both from browser_version)='' then null else browser_version::text end browser_version
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from ip)='' then null else ip::text end ip
|
||||
, case when trim(both from ip_city)='' then null else ip_city::text end ip_city
|
||||
, case when trim(both from ip_country)='' then null else ip_country::text end ip_country
|
||||
, case when trim(both from ip_county)='' then null else ip_county::text end ip_county
|
||||
, case when trim(both from ip_province)='' then null else ip_province::text end ip_province
|
||||
, case when trim(both from os)='' then null else os::text end os
|
||||
, case when trim(both from os_version)='' then null else os_version::text end os_version
|
||||
, case when trim(both from page_id)='' then null else page_id::text end page_id
|
||||
, case when trim(both from page_type)='' then null else page_type::text end page_type
|
||||
, case when trim(both from platform)='' then null else platform::text end platform
|
||||
, case when trim(both from score)='' then null else score::text end score
|
||||
, case when trim(both from submit_id)='' then null else submit_id::text end submit_id
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
, case when trim(both from target_name)='' then null else target_name::text end target_name
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'browser') browser
|
||||
, (json_array_elements(data::json)::json->>'browserVersion') browser_version
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'ip') ip
|
||||
, (json_array_elements(data::json)::json->>'ipCity') ip_city
|
||||
, (json_array_elements(data::json)::json->>'ipCountry') ip_country
|
||||
, (json_array_elements(data::json)::json->>'ipCounty') ip_county
|
||||
, (json_array_elements(data::json)::json->>'ipProvince') ip_province
|
||||
, (json_array_elements(data::json)::json->>'os') os
|
||||
, (json_array_elements(data::json)::json->>'osVersion') os_version
|
||||
, (json_array_elements(data::json)::json->>'pageId') page_id
|
||||
, (json_array_elements(data::json)::json->>'pageType') page_type
|
||||
, (json_array_elements(data::json)::json->>'platform') platform
|
||||
, (json_array_elements(data::json)::json->>'score') score
|
||||
, (json_array_elements(data::json)::json->>'submitId') submit_id
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
, (json_array_elements(data::json)::json->>'targetName') target_name
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='c6e5b45f-35f5-442b-84be-35ff2e76' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
||||
update data_api.api_data
|
||||
set is_loaded = '1' ,
|
||||
status = '1',
|
||||
request_tm = current_timestamp(0)
|
||||
where api_id='c6e5b45f-35f5-442b-84be-35ff2e76';
|
||||
\q
|
|
@ -0,0 +1,72 @@
|
|||
# coding: utf-8
|
||||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
import uuid
|
||||
import datetime
|
||||
import time
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
#全局变量,便于参数使用的预设值
|
||||
current_date = datetime.date.today() # 获取当前日期
|
||||
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||
sign_version = 'v2' # 签名版本号,固定值v2
|
||||
nonce = str(uuid.uuid4())
|
||||
|
||||
#获取签名令牌
|
||||
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||
# 按照指定的格式拼接字符串
|
||||
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||
# 使用SHA256算法计算哈希值
|
||||
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||
return sha256_hash
|
||||
|
||||
#获取鉴权token
|
||||
def get_token(url):
|
||||
#请求鉴权接口
|
||||
authRequest=requests.get(url)
|
||||
#解析结果
|
||||
if not authRequest: #若为空时,返回空
|
||||
return
|
||||
auth=json.loads(authRequest.text)
|
||||
return auth
|
||||
|
||||
print('开始加载数据:custom_events_subscribe:获取线索事件_subscribe')
|
||||
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||
|
||||
print('开始请求令牌。')
|
||||
#authRequest=requests.get(authUrl)
|
||||
#auth=json.loads(authRequest.text)
|
||||
auth = get_token(authUrl)
|
||||
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||
i = 0
|
||||
while not auth and i < 60:
|
||||
time.sleep(60)
|
||||
auth = get_token(authUrl)
|
||||
i = i + 1
|
||||
print('开始请求数据总数。')
|
||||
url='https://api.huiju.cool/v2/customerEvents'
|
||||
header={}
|
||||
body={'access_token':auth['access_token'],'limit':'','event':'subscribe','sort':'date','date[ge]':formatted_previous_date,'date[le]':formatted_current_date,}
|
||||
dataReqL=requests.get(url,headers=header,params=body)
|
||||
resL=json.loads(dataReqL.text)
|
||||
# print(resL)
|
||||
dataList=resL['data']
|
||||
total=len(dataList)
|
||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||
host="172.17.0.8", port="5432")
|
||||
print('数据库连接成功')
|
||||
dataId=str(uuid.uuid4())
|
||||
print('临时id:'+dataId)
|
||||
json_object = json.dumps(dataList)
|
||||
cur=conn.cursor()
|
||||
sql="update data_api.api_data set is_loaded = '1' where api_id = '36308871-eea2-49b0-8268-03ab68e3';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||
cur.execute(sql,[dataId,'36308871-eea2-49b0-8268-03ab68e3',json_object,total])
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
print('加载数据结束:custom_events_subscribe:获取线索事件_subscribe')
|
|
@ -0,0 +1,60 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
DELETE FROM data_api.customer_events_subscribe;
|
||||
|
||||
insert into data_api.customer_events_subscribe (
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, last_updated
|
||||
, date
|
||||
, target_name
|
||||
, channel_account
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
, score
|
||||
, target_id
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from target_name)='' then null else target_name::text end target_name
|
||||
, case when trim(both from channel_account)='' then null else channel_account::text end channel_account
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from score)='' then null else score::text end score
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'targetName') target_name
|
||||
, (json_array_elements(data::json)::json->>'channelAccount') channel_account
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'score') score
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='36308871-eea2-49b0-8268-03ab68e3' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
||||
update data_api.api_data
|
||||
set is_loaded = '1' ,
|
||||
status = '1',
|
||||
request_tm = current_timestamp(0)
|
||||
where api_id='36308871-eea2-49b0-8268-03ab68e3';
|
||||
\q
|
|
@ -0,0 +1,72 @@
|
|||
# coding: utf-8
|
||||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
import uuid
|
||||
import datetime
|
||||
import time
|
||||
import hashlib
|
||||
import time
|
||||
|
||||
#全局变量,便于参数使用的预设值
|
||||
current_date = datetime.date.today() # 获取当前日期
|
||||
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||
sign_version = 'v2' # 签名版本号,固定值v2
|
||||
nonce = str(uuid.uuid4())
|
||||
|
||||
#获取签名令牌
|
||||
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||
# 按照指定的格式拼接字符串
|
||||
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||
# 使用SHA256算法计算哈希值
|
||||
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||
return sha256_hash
|
||||
|
||||
#获取鉴权token
|
||||
def get_token(url):
|
||||
#请求鉴权接口
|
||||
authRequest=requests.get(url)
|
||||
#解析结果
|
||||
if not authRequest: #若为空时,返回空
|
||||
return
|
||||
auth=json.loads(authRequest.text)
|
||||
return auth
|
||||
|
||||
print('开始加载数据:custom_events_wechat_scan:获取线索事件_wechat_scan')
|
||||
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||
|
||||
print('开始请求令牌。')
|
||||
#authRequest=requests.get(authUrl)
|
||||
#auth=json.loads(authRequest.text)
|
||||
auth = get_token(authUrl)
|
||||
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||
i = 0
|
||||
while not auth and i < 60:
|
||||
time.sleep(60)
|
||||
auth = get_token(authUrl)
|
||||
i = i + 1
|
||||
print('开始请求数据总数。')
|
||||
url='https://api.huiju.cool/v2/customerEvents'
|
||||
header={}
|
||||
body={'access_token':auth['access_token'],'limit':'2000','sort':'date','event':'wechat_scan','date[le]':formatted_current_date,'date[ge]':formatted_previous_date,}
|
||||
dataReqL=requests.get(url,headers=header,params=body)
|
||||
resL=json.loads(dataReqL.text)
|
||||
# print(resL)
|
||||
dataList=resL['data']
|
||||
total=len(dataList)
|
||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||
host="172.17.0.8", port="5432")
|
||||
print('数据库连接成功')
|
||||
dataId=str(uuid.uuid4())
|
||||
print('临时id:'+dataId)
|
||||
json_object = json.dumps(dataList)
|
||||
cur=conn.cursor()
|
||||
sql="update data_api.api_data set is_loaded = '1' where api_id = '14f55a0e-1c01-4bd5-9eec-7e0d0bdf';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||
cur.execute(sql,[dataId,'14f55a0e-1c01-4bd5-9eec-7e0d0bdf',json_object,total])
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
print('加载数据结束:custom_events_wechat_scan:获取线索事件_wechat_scan')
|
|
@ -0,0 +1,60 @@
|
|||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
DELETE FROM data_api.customer_events_wechat_scan;
|
||||
|
||||
insert into data_api.customer_events_wechat_scan (
|
||||
id
|
||||
, customer_id
|
||||
, event
|
||||
, last_updated
|
||||
, date
|
||||
, channel_account
|
||||
, customer_id_str
|
||||
, external_id
|
||||
, id_str
|
||||
, score
|
||||
, target_id
|
||||
, target_name
|
||||
,etl_tx_dt
|
||||
)
|
||||
select
|
||||
case when trim(both from id)='' then null else id::text end id
|
||||
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
|
||||
, case when trim(both from event)='' then null else event::text end event
|
||||
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||
, case when trim(both from date)='' then null else date::text end date
|
||||
, case when trim(both from channel_account)='' then null else channel_account::text end channel_account
|
||||
, case when trim(both from customer_id_str)='' then null else customer_id_str::text end customer_id_str
|
||||
, case when trim(both from external_id)='' then null else external_id::text end external_id
|
||||
, case when trim(both from id_str)='' then null else id_str::text end id_str
|
||||
, case when trim(both from score)='' then null else score::text end score
|
||||
, case when trim(both from target_id)='' then null else target_id::text end target_id
|
||||
, case when trim(both from target_name)='' then null else target_name::text end target_name
|
||||
,etl_tx_dt
|
||||
from (
|
||||
select
|
||||
(json_array_elements(data::json)::json->>'id') id
|
||||
, (json_array_elements(data::json)::json->>'customerId') customer_id
|
||||
, (json_array_elements(data::json)::json->>'event') event
|
||||
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||
, (json_array_elements(data::json)::json->>'date') date
|
||||
, (json_array_elements(data::json)::json->>'channelAccount') channel_account
|
||||
, (json_array_elements(data::json)::json->>'customerIdStr') customer_id_str
|
||||
, (json_array_elements(data::json)::json->>'externalId') external_id
|
||||
, (json_array_elements(data::json)::json->>'idStr') id_str
|
||||
, (json_array_elements(data::json)::json->>'score') score
|
||||
, (json_array_elements(data::json)::json->>'targetId') target_id
|
||||
, (json_array_elements(data::json)::json->>'targetName') target_name
|
||||
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||
from (select * from data_api.api_data
|
||||
WHERE api_id='14f55a0e-1c01-4bd5-9eec-7e0d0bdf' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||
|
||||
update data_api.api_data
|
||||
set is_loaded = '1' ,
|
||||
status = '1',
|
||||
request_tm = current_timestamp(0)
|
||||
where api_id='14f55a0e-1c01-4bd5-9eec-7e0d0bdf';
|
||||
\q
|
Loading…
Reference in New Issue