add workflow 市场易API,dev
This commit is contained in:
parent
1b762b7c9e
commit
c6f7942089
|
@ -0,0 +1,34 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
delete from p10_sa.S98_S_api_contact_created
|
||||||
|
;
|
||||||
|
insert into p10_sa.S98_S_api_contact_created
|
||||||
|
( avatar
|
||||||
|
, id
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
avatar
|
||||||
|
, id
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt
|
||||||
|
from p00_tal.S98_S_api_contact_created
|
||||||
|
;
|
||||||
|
delete from p12_sfull.S98_S_api_contact_created
|
||||||
|
;
|
||||||
|
;
|
||||||
|
insert into p12_sfull.S98_S_api_contact_created
|
||||||
|
( avatar
|
||||||
|
, id
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
avatar
|
||||||
|
, id
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt
|
||||||
|
from p10_sa.S98_S_api_contact_created
|
||||||
|
;
|
||||||
|
\q
|
|
@ -0,0 +1,15 @@
|
||||||
|
|
||||||
|
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_api_contact_created (
|
||||||
|
avatar TEXT
|
||||||
|
, id TEXT
|
||||||
|
, properties TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'api_contact_created' );
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
|
||||||
|
create table if not exists p10_sa.S98_S_api_contact_created (
|
||||||
|
avatar TEXT
|
||||||
|
, id TEXT
|
||||||
|
, properties TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_created.avatar IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_created.id IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_created.properties IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_created.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p10_sa.S98_S_api_contact_created IS '';
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
create table if not exists p12_sfull.S98_S_api_contact_created (
|
||||||
|
avatar TEXT
|
||||||
|
, id TEXT
|
||||||
|
, properties TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_created.avatar IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_created.id IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_created.properties IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_created.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p12_sfull.S98_S_api_contact_created IS '';
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
delete from p10_sa.S98_S_api_contact_event_meta
|
||||||
|
;
|
||||||
|
insert into p10_sa.S98_S_api_contact_event_meta
|
||||||
|
( id
|
||||||
|
, name
|
||||||
|
, remark
|
||||||
|
, type
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
id
|
||||||
|
, name
|
||||||
|
, remark
|
||||||
|
, type
|
||||||
|
, etl_tx_dt
|
||||||
|
from p00_tal.S98_S_api_contact_event_meta
|
||||||
|
;
|
||||||
|
delete from p12_sfull.S98_S_api_contact_event_meta
|
||||||
|
;
|
||||||
|
;
|
||||||
|
insert into p12_sfull.S98_S_api_contact_event_meta
|
||||||
|
( id
|
||||||
|
, name
|
||||||
|
, remark
|
||||||
|
, type
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
id
|
||||||
|
, name
|
||||||
|
, remark
|
||||||
|
, type
|
||||||
|
, etl_tx_dt
|
||||||
|
from p10_sa.S98_S_api_contact_event_meta
|
||||||
|
;
|
||||||
|
\q
|
|
@ -0,0 +1,16 @@
|
||||||
|
|
||||||
|
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_api_contact_event_meta (
|
||||||
|
id TEXT
|
||||||
|
, name TEXT
|
||||||
|
, remark TEXT
|
||||||
|
, type TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'api_contact_event_meta' );
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
|
||||||
|
create table if not exists p10_sa.S98_S_api_contact_event_meta (
|
||||||
|
id TEXT
|
||||||
|
, name TEXT
|
||||||
|
, remark TEXT
|
||||||
|
, type TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_event_meta.id IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_event_meta.name IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_event_meta.remark IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_event_meta.type IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_event_meta.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p10_sa.S98_S_api_contact_event_meta IS '';
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
create table if not exists p12_sfull.S98_S_api_contact_event_meta (
|
||||||
|
id TEXT
|
||||||
|
, name TEXT
|
||||||
|
, remark TEXT
|
||||||
|
, type TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_event_meta.id IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_event_meta.name IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_event_meta.remark IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_event_meta.type IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_event_meta.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p12_sfull.S98_S_api_contact_event_meta IS '';
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
delete from p10_sa.S98_S_api_contact_events
|
||||||
|
;
|
||||||
|
insert into p10_sa.S98_S_api_contact_events
|
||||||
|
( at
|
||||||
|
, channel_id
|
||||||
|
, channel_name
|
||||||
|
, child_events
|
||||||
|
, connect_id
|
||||||
|
, connect_name
|
||||||
|
, id
|
||||||
|
, meta_id
|
||||||
|
, meta_name
|
||||||
|
, meta_remark
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
at
|
||||||
|
, channel_id
|
||||||
|
, channel_name
|
||||||
|
, child_events
|
||||||
|
, connect_id
|
||||||
|
, connect_name
|
||||||
|
, id
|
||||||
|
, meta_id
|
||||||
|
, meta_name
|
||||||
|
, meta_remark
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt
|
||||||
|
from p00_tal.S98_S_api_contact_events
|
||||||
|
;
|
||||||
|
delete from p12_sfull.S98_S_api_contact_events
|
||||||
|
;
|
||||||
|
;
|
||||||
|
insert into p12_sfull.S98_S_api_contact_events
|
||||||
|
( at
|
||||||
|
, channel_id
|
||||||
|
, channel_name
|
||||||
|
, child_events
|
||||||
|
, connect_id
|
||||||
|
, connect_name
|
||||||
|
, id
|
||||||
|
, meta_id
|
||||||
|
, meta_name
|
||||||
|
, meta_remark
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
at
|
||||||
|
, channel_id
|
||||||
|
, channel_name
|
||||||
|
, child_events
|
||||||
|
, connect_id
|
||||||
|
, connect_name
|
||||||
|
, id
|
||||||
|
, meta_id
|
||||||
|
, meta_name
|
||||||
|
, meta_remark
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt
|
||||||
|
from p10_sa.S98_S_api_contact_events
|
||||||
|
;
|
||||||
|
\q
|
|
@ -0,0 +1,23 @@
|
||||||
|
|
||||||
|
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_api_contact_events (
|
||||||
|
at TEXT
|
||||||
|
, channel_id TEXT
|
||||||
|
, channel_name TEXT
|
||||||
|
, child_events TEXT
|
||||||
|
, connect_id TEXT
|
||||||
|
, connect_name TEXT
|
||||||
|
, id TEXT
|
||||||
|
, meta_id TEXT
|
||||||
|
, meta_name TEXT
|
||||||
|
, meta_remark TEXT
|
||||||
|
, properties TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'api_contact_events' );
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
|
||||||
|
create table if not exists p10_sa.S98_S_api_contact_events (
|
||||||
|
at TEXT
|
||||||
|
, channel_id TEXT
|
||||||
|
, channel_name TEXT
|
||||||
|
, child_events TEXT
|
||||||
|
, connect_id TEXT
|
||||||
|
, connect_name TEXT
|
||||||
|
, id TEXT
|
||||||
|
, meta_id TEXT
|
||||||
|
, meta_name TEXT
|
||||||
|
, meta_remark TEXT
|
||||||
|
, properties TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.at IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.channel_id IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.channel_name IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.child_events IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.connect_id IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.connect_name IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.id IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.meta_id IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.meta_name IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.meta_remark IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.properties IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p10_sa.S98_S_api_contact_events IS '';
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
create table if not exists p12_sfull.S98_S_api_contact_events (
|
||||||
|
at TEXT
|
||||||
|
, channel_id TEXT
|
||||||
|
, channel_name TEXT
|
||||||
|
, child_events TEXT
|
||||||
|
, connect_id TEXT
|
||||||
|
, connect_name TEXT
|
||||||
|
, id TEXT
|
||||||
|
, meta_id TEXT
|
||||||
|
, meta_name TEXT
|
||||||
|
, meta_remark TEXT
|
||||||
|
, properties TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.at IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.channel_id IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.channel_name IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.child_events IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.connect_id IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.connect_name IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.id IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.meta_id IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.meta_name IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.meta_remark IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.properties IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p12_sfull.S98_S_api_contact_events IS '';
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
delete from p10_sa.S98_S_api_contact_fields
|
||||||
|
;
|
||||||
|
insert into p10_sa.S98_S_api_contact_fields
|
||||||
|
( data_type
|
||||||
|
, id
|
||||||
|
, is_enabled
|
||||||
|
, name
|
||||||
|
, remark
|
||||||
|
, sort
|
||||||
|
, type
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
data_type
|
||||||
|
, id
|
||||||
|
, is_enabled
|
||||||
|
, name
|
||||||
|
, remark
|
||||||
|
, sort
|
||||||
|
, type
|
||||||
|
, etl_tx_dt
|
||||||
|
from p00_tal.S98_S_api_contact_fields
|
||||||
|
;
|
||||||
|
delete from p12_sfull.S98_S_api_contact_fields
|
||||||
|
;
|
||||||
|
;
|
||||||
|
insert into p12_sfull.S98_S_api_contact_fields
|
||||||
|
( data_type
|
||||||
|
, id
|
||||||
|
, is_enabled
|
||||||
|
, name
|
||||||
|
, remark
|
||||||
|
, sort
|
||||||
|
, type
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
data_type
|
||||||
|
, id
|
||||||
|
, is_enabled
|
||||||
|
, name
|
||||||
|
, remark
|
||||||
|
, sort
|
||||||
|
, type
|
||||||
|
, etl_tx_dt
|
||||||
|
from p10_sa.S98_S_api_contact_fields
|
||||||
|
;
|
||||||
|
\q
|
|
@ -0,0 +1,19 @@
|
||||||
|
|
||||||
|
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_api_contact_fields (
|
||||||
|
data_type TEXT
|
||||||
|
, id TEXT
|
||||||
|
, is_enabled TEXT
|
||||||
|
, name TEXT
|
||||||
|
, remark TEXT
|
||||||
|
, sort TEXT
|
||||||
|
, type TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'api_contact_fields' );
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
|
||||||
|
create table if not exists p10_sa.S98_S_api_contact_fields (
|
||||||
|
data_type TEXT
|
||||||
|
, id TEXT
|
||||||
|
, is_enabled TEXT
|
||||||
|
, name TEXT
|
||||||
|
, remark TEXT
|
||||||
|
, sort TEXT
|
||||||
|
, type TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.data_type IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.id IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.is_enabled IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.name IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.remark IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.sort IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.type IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p10_sa.S98_S_api_contact_fields IS '';
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
create table if not exists p12_sfull.S98_S_api_contact_fields (
|
||||||
|
data_type TEXT
|
||||||
|
, id TEXT
|
||||||
|
, is_enabled TEXT
|
||||||
|
, name TEXT
|
||||||
|
, remark TEXT
|
||||||
|
, sort TEXT
|
||||||
|
, type TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.data_type IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.id IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.is_enabled IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.name IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.remark IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.sort IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.type IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p12_sfull.S98_S_api_contact_fields IS '';
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
delete from p10_sa.S98_S_api_contact_update_info
|
||||||
|
;
|
||||||
|
insert into p10_sa.S98_S_api_contact_update_info
|
||||||
|
( avatar
|
||||||
|
, id
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
avatar
|
||||||
|
, id
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt
|
||||||
|
from p00_tal.S98_S_api_contact_update_info
|
||||||
|
;
|
||||||
|
delete from p12_sfull.S98_S_api_contact_update_info
|
||||||
|
;
|
||||||
|
;
|
||||||
|
insert into p12_sfull.S98_S_api_contact_update_info
|
||||||
|
( avatar
|
||||||
|
, id
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
avatar
|
||||||
|
, id
|
||||||
|
, properties
|
||||||
|
, etl_tx_dt
|
||||||
|
from p10_sa.S98_S_api_contact_update_info
|
||||||
|
;
|
||||||
|
\q
|
|
@ -0,0 +1,15 @@
|
||||||
|
|
||||||
|
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_api_contact_update_info (
|
||||||
|
avatar TEXT
|
||||||
|
, id TEXT
|
||||||
|
, properties TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'api_contact_update_info' );
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
|
||||||
|
create table if not exists p10_sa.S98_S_api_contact_update_info (
|
||||||
|
avatar TEXT
|
||||||
|
, id TEXT
|
||||||
|
, properties TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_update_info.avatar IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_update_info.id IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_update_info.properties IS '';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_api_contact_update_info.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p10_sa.S98_S_api_contact_update_info IS '';
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
create table if not exists p12_sfull.S98_S_api_contact_update_info (
|
||||||
|
avatar TEXT
|
||||||
|
, id TEXT
|
||||||
|
, properties TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_update_info.avatar IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_update_info.id IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_update_info.properties IS '';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_api_contact_update_info.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p12_sfull.S98_S_api_contact_update_info IS '';
|
||||||
|
|
|
@ -0,0 +1,186 @@
|
||||||
|
#!/usr/bin/python
|
||||||
|
# -*- encoding=utf-8 -*-
|
||||||
|
from airflow import DAG
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from airflow.contrib.hooks.ssh_hook import SSHHook
|
||||||
|
from airflow.contrib.operators.ssh_operator import SSHOperator
|
||||||
|
from airflow.sensors.external_task_sensor import ExternalTaskSensor
|
||||||
|
import json
|
||||||
|
|
||||||
|
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
||||||
|
from airflow.operators.email_operator import EmailOperator
|
||||||
|
from airflow.utils.trigger_rule import TriggerRule
|
||||||
|
|
||||||
|
|
||||||
|
sshHook = SSHHook(ssh_conn_id ='ssh_air')
|
||||||
|
default_args = {
|
||||||
|
'owner': 'tek_newsletter@163.com',
|
||||||
|
'email_on_failure': True,
|
||||||
|
'email_on_retry':True,
|
||||||
|
'start_date': datetime(2024, 1, 1),
|
||||||
|
'depends_on_past': False,
|
||||||
|
'retries': 6,
|
||||||
|
'retry_delay': timedelta(minutes=10),
|
||||||
|
}
|
||||||
|
|
||||||
|
dag = DAG('wf_dag_${wf_fold}', default_args=default_args,
|
||||||
|
schedule_interval="0 1 * * *",
|
||||||
|
catchup=False,
|
||||||
|
dagrun_timeout=timedelta(minutes=160),
|
||||||
|
max_active_runs=3)
|
||||||
|
|
||||||
|
task_failed = EmailOperator (
|
||||||
|
dag=dag,
|
||||||
|
trigger_rule=TriggerRule.ONE_FAILED,
|
||||||
|
task_id="task_failed",
|
||||||
|
to=["tek_newsletter@163.com"],
|
||||||
|
cc=[""],
|
||||||
|
subject="${wf_fold}_failed",
|
||||||
|
html_content='<h3>您好,${wf_fold}作业失败,请及时处理" </h3>')
|
||||||
|
|
||||||
|
contact_event_meta_feign = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='contact_event_meta_feign',
|
||||||
|
command='python3 /data/airflow/etl/API/contact_event_meta_feign.py',
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
contact_event_meta_load = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='contact_event_meta_load',
|
||||||
|
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||||
|
params={'my_param':"contact_event_meta_load"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
contact_event_meta_feign >> contact_event_meta_load
|
||||||
|
|
||||||
|
contact_create_list_feign = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='contact_create_list_feign',
|
||||||
|
command='python3 /data/airflow/etl/API/contact_create_list_feign.py',
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
contact_create_list_load = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='contact_create_list_load',
|
||||||
|
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||||
|
params={'my_param':"contact_create_list_load"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
contact_create_list_feign >> contact_create_list_load
|
||||||
|
|
||||||
|
contact_update_feign = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='contact_update_feign',
|
||||||
|
command='python3 /data/airflow/etl/API/contact_update_feign.py',
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
contact_update_load = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='contact_update_load',
|
||||||
|
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||||
|
params={'my_param':"contact_update_load"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
contact_update_feign >> contact_update_load
|
||||||
|
|
||||||
|
api_contact_events_feign = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='api_contact_events_feign',
|
||||||
|
command='python3 /data/airflow/etl/API/api_contact_events_feign.py',
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
api_contact_events_load = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='api_contact_events_load',
|
||||||
|
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||||
|
params={'my_param':"api_contact_events_load"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
api_contact_events_feign >> api_contact_events_load
|
||||||
|
|
||||||
|
contact_fields_feign = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='contact_fields_feign',
|
||||||
|
command='python3 /data/airflow/etl/API/contact_fields_feign.py',
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
contact_fields_load = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='contact_fields_load',
|
||||||
|
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||||
|
params={'my_param':"contact_fields_load"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
contact_fields_feign >> contact_fields_load
|
||||||
|
|
||||||
|
api_contact_created_3850 = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='api_contact_created_3850',
|
||||||
|
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
|
||||||
|
params={'my_param':"S98_S_api_contact_created"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
api_contact_events_6043 = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='api_contact_events_6043',
|
||||||
|
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
|
||||||
|
params={'my_param':"S98_S_api_contact_events"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
api_contact_fields_4587 = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='api_contact_fields_4587',
|
||||||
|
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
|
||||||
|
params={'my_param':"S98_S_api_contact_fields"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
api_contact_update_info_2218 = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='api_contact_update_info_2218',
|
||||||
|
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
|
||||||
|
params={'my_param':"S98_S_api_contact_update_info"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
api_contact_event_meta_9651 = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='api_contact_event_meta_9651',
|
||||||
|
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
|
||||||
|
params={'my_param':"S98_S_api_contact_event_meta"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
contact_create_list_load >> api_contact_created_3850
|
||||||
|
contact_event_meta_load >> api_contact_event_meta_9651
|
||||||
|
api_contact_events_load >> api_contact_events_6043
|
||||||
|
contact_update_load >> api_contact_update_info_2218
|
||||||
|
contact_fields_load >> api_contact_fields_4587
|
||||||
|
api_contact_fields_4587 >> task_failed
|
|
@ -0,0 +1,126 @@
|
||||||
|
# coding: utf-8
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import datetime as dt
|
||||||
|
import psycopg2
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取指定时间段前的时间
|
||||||
|
:param h: 时间段
|
||||||
|
:return: 时间
|
||||||
|
"""
|
||||||
|
def formatted2_previous_hour(h):
|
||||||
|
if h==0:
|
||||||
|
return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0)
|
||||||
|
# 减去一个小时,得到前一个小时的开始时间
|
||||||
|
start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h)
|
||||||
|
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取token
|
||||||
|
:return: token
|
||||||
|
"""
|
||||||
|
def get_token():
|
||||||
|
url = "https://open.cloud.custouch.com/platform/cdp/token"
|
||||||
|
token_payload = {
|
||||||
|
'grant_type':'client_credentials',
|
||||||
|
'app_key':'e9b240eb3a9848e89a96c5e7857794da',
|
||||||
|
'app_secret':'f8cb7069e7dd468888e360bf8c259fc6',
|
||||||
|
'scope':'openid'
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
'Content-Type': 'application/x-www-form-urlencoded'
|
||||||
|
}
|
||||||
|
response = requests.request("POST", url, headers=headers, data=token_payload)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("获取token失败")
|
||||||
|
|
||||||
|
res = json.loads(response.text)
|
||||||
|
return res['access_token']
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取API数据
|
||||||
|
:param token: token
|
||||||
|
:return: contact_ids
|
||||||
|
"""
|
||||||
|
def fetch_data(token):
|
||||||
|
url = "https://open.cloud.custouch.com/platform/cdp/event_meta"
|
||||||
|
params = {}
|
||||||
|
headers = {
|
||||||
|
'Authorization': f'Bearer {token}'
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.request("GET", url, headers=headers, params=params)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("获取新建联系人ID失败")
|
||||||
|
|
||||||
|
res = json.loads(response.text)
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
PG_DSN = dict(database="dataops_db",
|
||||||
|
user ="dbuser_dba",
|
||||||
|
password="EmBRxnmmjnE3",
|
||||||
|
host ="124.221.232.219",
|
||||||
|
port ="5432")
|
||||||
|
|
||||||
|
|
||||||
|
def save_json_to_pg(data: list, api_id: str) -> None:
|
||||||
|
"""把列表落库:先软删历史,再插入新批次"""
|
||||||
|
print('[save_to_pg] 写入 PG...')
|
||||||
|
sql = """
|
||||||
|
UPDATE data_api.api_data
|
||||||
|
SET is_loaded = '1'
|
||||||
|
WHERE api_id = %s;
|
||||||
|
|
||||||
|
INSERT INTO data_api.api_data
|
||||||
|
(id, api_id, data, total_num, is_loaded, status,
|
||||||
|
request_tm, execute_tm, remark)
|
||||||
|
VALUES (%s, %s, %s, %s, '0', '0',
|
||||||
|
current_timestamp(0), current_timestamp(0), '');
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with psycopg2.connect(**PG_DSN) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(sql,
|
||||||
|
(api_id,
|
||||||
|
str(uuid.uuid4()),
|
||||||
|
api_id,
|
||||||
|
json.dumps(data, ensure_ascii=False),
|
||||||
|
len(data)))
|
||||||
|
conn.commit()
|
||||||
|
cur.close()
|
||||||
|
print('[save_to_pg] 写入完成')
|
||||||
|
except psycopg2.Error as e:
|
||||||
|
print(f'[save_to_pg] 数据库错误: {e}')
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
print(f'[save_to_pg] 未知错误: {e}')
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
if 'conn' in locals():
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""主流程"""
|
||||||
|
# print(get_token())
|
||||||
|
print(f'开始请求新建联系人信息:{formatted2_previous_hour(0)}')
|
||||||
|
token = get_token()
|
||||||
|
# print(token)
|
||||||
|
# 获取新建联系人ID
|
||||||
|
print(f'开始请求新建联系人ID:{formatted2_previous_hour(0)}')
|
||||||
|
objs = fetch_data(token)
|
||||||
|
# 保存联系人ID
|
||||||
|
apiId = 'ae05ffcd-50bb-4b4e-8c2f-a7ddca84'
|
||||||
|
save_json_to_pg(objs, apiId)
|
||||||
|
print(f'结束请求联系人详情:{formatted2_previous_hour(0)}')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
|
||||||
|
DELETE FROM data_api.api_contact_event_meta;
|
||||||
|
|
||||||
|
insert into data_api.api_contact_event_meta (
|
||||||
|
id
|
||||||
|
, name
|
||||||
|
, remark
|
||||||
|
, type
|
||||||
|
,etl_tx_dt
|
||||||
|
)
|
||||||
|
select
|
||||||
|
case when trim(both from id)='' then null else id::text end id
|
||||||
|
, case when trim(both from name)='' then null else name::text end name
|
||||||
|
, case when trim(both from remark)='' then null else remark::text end remark
|
||||||
|
, case when trim(both from type)='' then null else type::text end type
|
||||||
|
,etl_tx_dt
|
||||||
|
from (
|
||||||
|
select
|
||||||
|
(json_array_elements(data::json)::json->>'id') id
|
||||||
|
, (json_array_elements(data::json)::json->>'name') name
|
||||||
|
, (json_array_elements(data::json)::json->>'remark') remark
|
||||||
|
, (json_array_elements(data::json)::json->>'type') type
|
||||||
|
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||||
|
from (select * from data_api.api_data
|
||||||
|
WHERE api_id='ae05ffcd-50bb-4b4e-8c2f-a7ddca84' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||||
|
|
||||||
|
update data_api.api_data
|
||||||
|
set is_loaded = '1' ,
|
||||||
|
status = '1',
|
||||||
|
request_tm = current_timestamp(0)
|
||||||
|
where api_id='ae05ffcd-50bb-4b4e-8c2f-a7ddca84';
|
||||||
|
\q
|
|
@ -0,0 +1,159 @@
|
||||||
|
# coding: utf-8
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import datetime as dt
|
||||||
|
import psycopg2
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取指定时间段前的时间
|
||||||
|
:param h: 时间段
|
||||||
|
:return: 时间
|
||||||
|
"""
|
||||||
|
def formatted2_previous_hour(h):
|
||||||
|
if h==0:
|
||||||
|
return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0)
|
||||||
|
# 减去一个小时,得到前一个小时的开始时间
|
||||||
|
start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h)
|
||||||
|
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取token
|
||||||
|
:return: token
|
||||||
|
"""
|
||||||
|
def get_token():
|
||||||
|
url = "https://open.cloud.custouch.com/platform/cdp/token"
|
||||||
|
token_payload = {
|
||||||
|
'grant_type':'client_credentials',
|
||||||
|
'app_key':'e9b240eb3a9848e89a96c5e7857794da',
|
||||||
|
'app_secret':'f8cb7069e7dd468888e360bf8c259fc6',
|
||||||
|
'scope':'openid'
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
'Content-Type': 'application/x-www-form-urlencoded'
|
||||||
|
}
|
||||||
|
response = requests.request("POST", url, headers=headers, data=token_payload)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("获取token失败")
|
||||||
|
|
||||||
|
res = json.loads(response.text)
|
||||||
|
return res['access_token']
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取联系人id
|
||||||
|
:param token: token
|
||||||
|
:return: contact_ids
|
||||||
|
"""
|
||||||
|
def get_contact_ids(token):
|
||||||
|
url = "https://open.cloud.custouch.com/platform/cdp/contact/batch"
|
||||||
|
params = {
|
||||||
|
'start': formatted2_previous_hour(72),
|
||||||
|
'end': formatted2_previous_hour(0),
|
||||||
|
'by':'CreatedAt'
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
'Authorization': f'Bearer {token}'
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.request("GET", url, headers=headers, params=params)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("获取新建联系人ID失败")
|
||||||
|
|
||||||
|
res = json.loads(response.text)
|
||||||
|
return res['model']
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取联系人详情
|
||||||
|
:param token: token
|
||||||
|
:param ids: 联系人id
|
||||||
|
:return: contact_detail
|
||||||
|
"""
|
||||||
|
def get_contact_detail(token, ids):
|
||||||
|
url = "https://open.cloud.custouch.com/platform/cdp/contact/batch/properties"
|
||||||
|
payload = json.dumps(ids)
|
||||||
|
headers = {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'Authorization': f'Bearer {token}'
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.request("POST", url, headers=headers, data=payload)
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("获取新建联系人详情失败")
|
||||||
|
|
||||||
|
res = json.loads(response.text)
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
PG_DSN = dict(database="dataops_db",
|
||||||
|
user ="dbuser_dba",
|
||||||
|
password="EmBRxnmmjnE3",
|
||||||
|
host ="124.221.232.219",
|
||||||
|
port ="5432")
|
||||||
|
|
||||||
|
|
||||||
|
def save_json_to_pg(data: list, api_id: str) -> None:
|
||||||
|
"""把列表落库:先软删历史,再插入新批次"""
|
||||||
|
print('[save_to_pg] 写入 PG...')
|
||||||
|
sql = """
|
||||||
|
UPDATE data_api.api_data
|
||||||
|
SET is_loaded = '1'
|
||||||
|
WHERE api_id = %s;
|
||||||
|
|
||||||
|
INSERT INTO data_api.api_data
|
||||||
|
(id, api_id, data, total_num, is_loaded, status,
|
||||||
|
request_tm, execute_tm, remark)
|
||||||
|
VALUES (%s, %s, %s, %s, '0', '0',
|
||||||
|
current_timestamp(0), current_timestamp(0), '');
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with psycopg2.connect(**PG_DSN) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(sql,
|
||||||
|
(api_id,
|
||||||
|
str(uuid.uuid4()),
|
||||||
|
api_id,
|
||||||
|
json.dumps(data, ensure_ascii=False),
|
||||||
|
len(data)))
|
||||||
|
conn.commit()
|
||||||
|
cur.close()
|
||||||
|
print('[save_to_pg] 写入完成')
|
||||||
|
except psycopg2.Error as e:
|
||||||
|
print(f'[save_to_pg] 数据库错误: {e}')
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
print(f'[save_to_pg] 未知错误: {e}')
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
if 'conn' in locals():
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""主流程"""
|
||||||
|
# print(get_token()
|
||||||
|
print(f'开始请求新建联系人信息:{formatted2_previous_hour(0)}')
|
||||||
|
token = get_token()
|
||||||
|
# print(token)
|
||||||
|
# 获取新建联系人ID
|
||||||
|
print(f'开始请求新建联系人ID:{formatted2_previous_hour(0)}')
|
||||||
|
ids = get_contact_ids(token)
|
||||||
|
# 保存联系人ID
|
||||||
|
apiId_ids = 'c54b8459-77ae-4ab2-8e34-01897c2d'
|
||||||
|
save_json_to_pg(ids, apiId_ids)
|
||||||
|
# print(ids)
|
||||||
|
# 获取联系人详情
|
||||||
|
print(f'开始请求联系人详情:{formatted2_previous_hour(0)}')
|
||||||
|
objs = get_contact_detail(token, ids)
|
||||||
|
# 保存联系人详情
|
||||||
|
apiId = 'ef261acf-92e8-414f-ae92-c0764164'
|
||||||
|
save_json_to_pg(objs, apiId)
|
||||||
|
print(f'结束请求联系人详情:{formatted2_previous_hour(0)}')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
|
||||||
|
DELETE FROM data_api.api_contact_created;
|
||||||
|
|
||||||
|
insert into data_api.api_contact_created (
|
||||||
|
avatar
|
||||||
|
, id
|
||||||
|
, properties
|
||||||
|
,etl_tx_dt
|
||||||
|
)
|
||||||
|
select
|
||||||
|
case when trim(both from avatar)='' then null else avatar::text end avatar
|
||||||
|
, case when trim(both from id)='' then null else id::text end id
|
||||||
|
, case when trim(both from properties)='' then null else properties::text end properties
|
||||||
|
,etl_tx_dt
|
||||||
|
from (
|
||||||
|
select
|
||||||
|
(json_array_elements(data::json)::json->>'avatar') avatar
|
||||||
|
, (json_array_elements(data::json)::json->>'id') id
|
||||||
|
, (json_array_elements(data::json)::json->>'properties') properties
|
||||||
|
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||||
|
from (select * from data_api.api_data
|
||||||
|
WHERE api_id='ef261acf-92e8-414f-ae92-c0764164' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||||
|
|
||||||
|
update data_api.api_data
|
||||||
|
set is_loaded = '1' ,
|
||||||
|
status = '1',
|
||||||
|
request_tm = current_timestamp(0)
|
||||||
|
where api_id='ef261acf-92e8-414f-ae92-c0764164';
|
||||||
|
\q
|
|
@ -0,0 +1,159 @@
|
||||||
|
# coding: utf-8
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import datetime as dt
|
||||||
|
import psycopg2
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取指定时间段前的时间
|
||||||
|
:param h: 时间段
|
||||||
|
:return: 时间
|
||||||
|
"""
|
||||||
|
def formatted2_previous_hour(h):
|
||||||
|
if h==0:
|
||||||
|
return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0)
|
||||||
|
# 减去一个小时,得到前一个小时的开始时间
|
||||||
|
start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h)
|
||||||
|
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取token
|
||||||
|
:return: token
|
||||||
|
"""
|
||||||
|
def get_token():
|
||||||
|
url = "https://open.cloud.custouch.com/platform/cdp/token"
|
||||||
|
token_payload = {
|
||||||
|
'grant_type':'client_credentials',
|
||||||
|
'app_key':'e9b240eb3a9848e89a96c5e7857794da',
|
||||||
|
'app_secret':'f8cb7069e7dd468888e360bf8c259fc6',
|
||||||
|
'scope':'openid'
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
'Content-Type': 'application/x-www-form-urlencoded'
|
||||||
|
}
|
||||||
|
response = requests.request("POST", url, headers=headers, data=token_payload)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("获取token失败")
|
||||||
|
|
||||||
|
res = json.loads(response.text)
|
||||||
|
return res['access_token']
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取联系人id
|
||||||
|
:param token: token
|
||||||
|
:return: contact_ids
|
||||||
|
"""
|
||||||
|
def get_contact_ids(token):
|
||||||
|
url = "https://open.cloud.custouch.com/platform/cdp/contact/batch"
|
||||||
|
params = {
|
||||||
|
'start': formatted2_previous_hour(72),
|
||||||
|
'end': formatted2_previous_hour(0),
|
||||||
|
'by':'UpdatedAt',
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
'Authorization': f'Bearer {token}'
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.request("GET", url, headers=headers, params=params)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("获取新建联系人ID失败")
|
||||||
|
|
||||||
|
res = json.loads(response.text)
|
||||||
|
return res['model']
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取联系人详情
|
||||||
|
:param token: token
|
||||||
|
:param ids: 联系人id
|
||||||
|
:return: contact_detail
|
||||||
|
"""
|
||||||
|
def get_contact_detail(token, ids):
|
||||||
|
url = "https://open.cloud.custouch.com/platform/cdp/contact/batch/properties"
|
||||||
|
payload = json.dumps(ids)
|
||||||
|
headers = {
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'Authorization': f'Bearer {token}'
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.request("POST", url, headers=headers, data=payload)
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("获取新建联系人详情失败")
|
||||||
|
|
||||||
|
res = json.loads(response.text)
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
PG_DSN = dict(database="dataops_db",
|
||||||
|
user ="dbuser_dba",
|
||||||
|
password="EmBRxnmmjnE3",
|
||||||
|
host ="124.221.232.219",
|
||||||
|
port ="5432")
|
||||||
|
|
||||||
|
|
||||||
|
def save_json_to_pg(data: list, api_id: str) -> None:
|
||||||
|
"""把列表落库:先软删历史,再插入新批次"""
|
||||||
|
print('[save_to_pg] 写入 PG...')
|
||||||
|
sql = """
|
||||||
|
UPDATE data_api.api_data
|
||||||
|
SET is_loaded = '1'
|
||||||
|
WHERE api_id = %s;
|
||||||
|
|
||||||
|
INSERT INTO data_api.api_data
|
||||||
|
(id, api_id, data, total_num, is_loaded, status,
|
||||||
|
request_tm, execute_tm, remark)
|
||||||
|
VALUES (%s, %s, %s, %s, '0', '0',
|
||||||
|
current_timestamp(0), current_timestamp(0), '');
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with psycopg2.connect(**PG_DSN) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(sql,
|
||||||
|
(api_id,
|
||||||
|
str(uuid.uuid4()),
|
||||||
|
api_id,
|
||||||
|
json.dumps(data, ensure_ascii=False),
|
||||||
|
len(data)))
|
||||||
|
conn.commit()
|
||||||
|
cur.close()
|
||||||
|
print('[save_to_pg] 写入完成')
|
||||||
|
except psycopg2.Error as e:
|
||||||
|
print(f'[save_to_pg] 数据库错误: {e}')
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
print(f'[save_to_pg] 未知错误: {e}')
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
if 'conn' in locals():
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""主流程"""
|
||||||
|
# print(get_token()
|
||||||
|
print(f'开始请求新建联系人信息:{formatted2_previous_hour(0)}')
|
||||||
|
token = get_token()
|
||||||
|
# print(token)
|
||||||
|
# 获取新建联系人ID
|
||||||
|
print(f'开始请求新建联系人ID:{formatted2_previous_hour(0)}')
|
||||||
|
ids = get_contact_ids(token)
|
||||||
|
# 保存联系人ID
|
||||||
|
apiId_ids = 'e8071e23-0ced-4ba9-8cac-fd7828df'
|
||||||
|
save_json_to_pg(ids, apiId_ids)
|
||||||
|
# print(ids)
|
||||||
|
# 获取联系人详情
|
||||||
|
print(f'开始请求联系人详情:{formatted2_previous_hour(0)}')
|
||||||
|
objs = get_contact_detail(token, ids)
|
||||||
|
# 保存联系人详情
|
||||||
|
apiId = 'ff6f11b4-d49c-47d8-b870-7082e438'
|
||||||
|
save_json_to_pg(objs, apiId)
|
||||||
|
print(f'结束请求联系人详情:{formatted2_previous_hour(0)}')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
|
@ -0,0 +1,33 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
|
||||||
|
DELETE FROM data_api.api_contact_update_info;
|
||||||
|
|
||||||
|
insert into data_api.api_contact_update_info (
|
||||||
|
avatar
|
||||||
|
, id
|
||||||
|
, properties
|
||||||
|
,etl_tx_dt
|
||||||
|
)
|
||||||
|
select
|
||||||
|
case when trim(both from avatar)='' then null else avatar::text end avatar
|
||||||
|
, case when trim(both from id)='' then null else id::text end id
|
||||||
|
, case when trim(both from properties)='' then null else properties::text end properties
|
||||||
|
,etl_tx_dt
|
||||||
|
from (
|
||||||
|
select
|
||||||
|
(json_array_elements(data::json)::json->>'avatar') avatar
|
||||||
|
, (json_array_elements(data::json)::json->>'id') id
|
||||||
|
, (json_array_elements(data::json)::json->>'properties') properties
|
||||||
|
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||||
|
from (select * from data_api.api_data
|
||||||
|
WHERE api_id='ff6f11b4-d49c-47d8-b870-7082e438' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||||
|
|
||||||
|
update data_api.api_data
|
||||||
|
set is_loaded = '1' ,
|
||||||
|
status = '1',
|
||||||
|
request_tm = current_timestamp(0)
|
||||||
|
where api_id='ff6f11b4-d49c-47d8-b870-7082e438';
|
||||||
|
\q
|
|
@ -0,0 +1,74 @@
|
||||||
|
# coding: utf-8
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import psycopg2
|
||||||
|
import uuid
|
||||||
|
import datetime
|
||||||
|
import time
|
||||||
|
import hashlib
|
||||||
|
import time
|
||||||
|
|
||||||
|
#荟聚
|
||||||
|
|
||||||
|
#全局变量,便于参数使用的预设值
|
||||||
|
current_date = datetime.date.today() # 获取当前日期
|
||||||
|
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||||
|
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||||
|
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||||
|
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||||
|
sign_version = 'v2' # 签名版本号,固定值v2
|
||||||
|
nonce = str(uuid.uuid4())
|
||||||
|
|
||||||
|
#获取签名令牌
|
||||||
|
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||||
|
# 按照指定的格式拼接字符串
|
||||||
|
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||||
|
# 使用SHA256算法计算哈希值
|
||||||
|
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||||
|
return sha256_hash
|
||||||
|
|
||||||
|
#获取鉴权token
|
||||||
|
def get_token(url):
|
||||||
|
#请求鉴权接口
|
||||||
|
authRequest=requests.get(url)
|
||||||
|
#解析结果
|
||||||
|
if not authRequest: #若为空时,返回空
|
||||||
|
return
|
||||||
|
auth=json.loads(authRequest.text)
|
||||||
|
return auth
|
||||||
|
|
||||||
|
print('开始加载数据:api_contact_events:获取联系人事件')
|
||||||
|
authUrl='https://open.cloud.custouch.com/platform/cdp/token'
|
||||||
|
|
||||||
|
print('开始请求令牌。')
|
||||||
|
#authRequest=requests.get(authUrl)
|
||||||
|
#auth=json.loads(authRequest.text)
|
||||||
|
auth = get_token(authUrl)
|
||||||
|
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||||
|
i = 0
|
||||||
|
while 'error' in auth and i < 60:
|
||||||
|
time.sleep(60)
|
||||||
|
auth = get_token(authUrl)
|
||||||
|
i = i + 1
|
||||||
|
print('开始请求数据总数。')
|
||||||
|
url='https://open.cloud.custouch.com/platform/cdp/contact/event'
|
||||||
|
header={}
|
||||||
|
body={'app_secret':auth['app_secret'],'app_key':auth['app_key'],'contactId':'',}
|
||||||
|
dataReqL=requests.get(url,headers=header,params=body)
|
||||||
|
resL=json.loads(dataReqL.text)
|
||||||
|
# print(resL)
|
||||||
|
dataList=resL['data']
|
||||||
|
total=len(dataList)
|
||||||
|
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||||
|
host="172.17.0.8", port="5432")
|
||||||
|
print('数据库连接成功')
|
||||||
|
dataId=str(uuid.uuid4())
|
||||||
|
print('临时id:'+dataId)
|
||||||
|
json_object = json.dumps(dataList)
|
||||||
|
cur=conn.cursor()
|
||||||
|
sql="update data_api.api_data set is_loaded = '1' where api_id = 'a7757b4a-7038-40ef-b11e-81a2c5e0';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||||
|
cur.execute(sql,[dataId,'a7757b4a-7038-40ef-b11e-81a2c5e0',json_object,total])
|
||||||
|
conn.commit()
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
print('加载数据结束:api_contact_events:获取联系人事件')
|
|
@ -0,0 +1,57 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
|
||||||
|
DELETE FROM data_api.api_contact_events;
|
||||||
|
|
||||||
|
insert into data_api.api_contact_events (
|
||||||
|
at
|
||||||
|
, channel_id
|
||||||
|
, channel_name
|
||||||
|
, child_events
|
||||||
|
, connect_id
|
||||||
|
, connect_name
|
||||||
|
, id
|
||||||
|
, meta_id
|
||||||
|
, meta_name
|
||||||
|
, meta_remark
|
||||||
|
, properties
|
||||||
|
,etl_tx_dt
|
||||||
|
)
|
||||||
|
select
|
||||||
|
case when trim(both from at)='' then null else at::text end at
|
||||||
|
, case when trim(both from channel_id)='' then null else channel_id::text end channel_id
|
||||||
|
, case when trim(both from channel_name)='' then null else channel_name::text end channel_name
|
||||||
|
, case when trim(both from child_events)='' then null else child_events::text end child_events
|
||||||
|
, case when trim(both from connect_id)='' then null else connect_id::text end connect_id
|
||||||
|
, case when trim(both from connect_name)='' then null else connect_name::text end connect_name
|
||||||
|
, case when trim(both from id)='' then null else id::text end id
|
||||||
|
, case when trim(both from meta_id)='' then null else meta_id::text end meta_id
|
||||||
|
, case when trim(both from meta_name)='' then null else meta_name::text end meta_name
|
||||||
|
, case when trim(both from meta_remark)='' then null else meta_remark::text end meta_remark
|
||||||
|
, case when trim(both from properties)='' then null else properties::text end properties
|
||||||
|
,etl_tx_dt
|
||||||
|
from (
|
||||||
|
select
|
||||||
|
(json_array_elements(data::json)::json->>'at') at
|
||||||
|
, (json_array_elements(data::json)::json->>'channelId') channel_id
|
||||||
|
, (json_array_elements(data::json)::json->>'channelName') channel_name
|
||||||
|
, (json_array_elements(data::json)::json->>'childEvents') child_events
|
||||||
|
, (json_array_elements(data::json)::json->>'connectId') connect_id
|
||||||
|
, (json_array_elements(data::json)::json->>'connectName') connect_name
|
||||||
|
, (json_array_elements(data::json)::json->>'id') id
|
||||||
|
, (json_array_elements(data::json)::json->>'metaId') meta_id
|
||||||
|
, (json_array_elements(data::json)::json->>'metaName') meta_name
|
||||||
|
, (json_array_elements(data::json)::json->>'metaRemark') meta_remark
|
||||||
|
, (json_array_elements(data::json)::json->>'properties') properties
|
||||||
|
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||||
|
from (select * from data_api.api_data
|
||||||
|
WHERE api_id='a7757b4a-7038-40ef-b11e-81a2c5e0' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||||
|
|
||||||
|
update data_api.api_data
|
||||||
|
set is_loaded = '1' ,
|
||||||
|
status = '1',
|
||||||
|
request_tm = current_timestamp(0)
|
||||||
|
where api_id='a7757b4a-7038-40ef-b11e-81a2c5e0';
|
||||||
|
\q
|
|
@ -0,0 +1,128 @@
|
||||||
|
# coding: utf-8
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import datetime as dt
|
||||||
|
import psycopg2
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取指定时间段前的时间
|
||||||
|
:param h: 时间段
|
||||||
|
:return: 时间
|
||||||
|
"""
|
||||||
|
def formatted2_previous_hour(h):
|
||||||
|
if h==0:
|
||||||
|
return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0)
|
||||||
|
# 减去一个小时,得到前一个小时的开始时间
|
||||||
|
start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h)
|
||||||
|
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取token
|
||||||
|
:return: token
|
||||||
|
"""
|
||||||
|
def get_token():
|
||||||
|
url = "https://open.cloud.custouch.com/platform/cdp/token"
|
||||||
|
token_payload = {
|
||||||
|
'grant_type':'client_credentials',
|
||||||
|
'app_key':'e9b240eb3a9848e89a96c5e7857794da',
|
||||||
|
'app_secret':'f8cb7069e7dd468888e360bf8c259fc6',
|
||||||
|
'scope':'openid'
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
'Content-Type': 'application/x-www-form-urlencoded'
|
||||||
|
}
|
||||||
|
response = requests.request("POST", url, headers=headers, data=token_payload)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("获取token失败")
|
||||||
|
|
||||||
|
res = json.loads(response.text)
|
||||||
|
return res['access_token']
|
||||||
|
|
||||||
|
"""
|
||||||
|
获取API数据
|
||||||
|
:param token: token
|
||||||
|
:return: contact_ids
|
||||||
|
"""
|
||||||
|
def fetch_data(token):
|
||||||
|
url = "https://open.cloud.custouch.com/platform/cdp/contact/field"
|
||||||
|
params = {
|
||||||
|
'isIgnoreSurname':'false',
|
||||||
|
}
|
||||||
|
headers = {
|
||||||
|
'Authorization': f'Bearer {token}'
|
||||||
|
}
|
||||||
|
|
||||||
|
response = requests.request("GET", url, headers=headers, params=params)
|
||||||
|
if response.status_code != 200:
|
||||||
|
raise Exception("获取新建联系人ID失败")
|
||||||
|
|
||||||
|
res = json.loads(response.text)
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
PG_DSN = dict(database="dataops_db",
|
||||||
|
user ="dbuser_dba",
|
||||||
|
password="EmBRxnmmjnE3",
|
||||||
|
host ="124.221.232.219",
|
||||||
|
port ="5432")
|
||||||
|
|
||||||
|
|
||||||
|
def save_json_to_pg(data: list, api_id: str) -> None:
|
||||||
|
"""把列表落库:先软删历史,再插入新批次"""
|
||||||
|
print('[save_to_pg] 写入 PG...')
|
||||||
|
sql = """
|
||||||
|
UPDATE data_api.api_data
|
||||||
|
SET is_loaded = '1'
|
||||||
|
WHERE api_id = %s;
|
||||||
|
|
||||||
|
INSERT INTO data_api.api_data
|
||||||
|
(id, api_id, data, total_num, is_loaded, status,
|
||||||
|
request_tm, execute_tm, remark)
|
||||||
|
VALUES (%s, %s, %s, %s, '0', '0',
|
||||||
|
current_timestamp(0), current_timestamp(0), '');
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
with psycopg2.connect(**PG_DSN) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute(sql,
|
||||||
|
(api_id,
|
||||||
|
str(uuid.uuid4()),
|
||||||
|
api_id,
|
||||||
|
json.dumps(data, ensure_ascii=False),
|
||||||
|
len(data)))
|
||||||
|
conn.commit()
|
||||||
|
cur.close()
|
||||||
|
print('[save_to_pg] 写入完成')
|
||||||
|
except psycopg2.Error as e:
|
||||||
|
print(f'[save_to_pg] 数据库错误: {e}')
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
print(f'[save_to_pg] 未知错误: {e}')
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
if 'conn' in locals():
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""主流程"""
|
||||||
|
# print(get_token())
|
||||||
|
print(f'开始请求新建联系人信息:{formatted2_previous_hour(0)}')
|
||||||
|
token = get_token()
|
||||||
|
# print(token)
|
||||||
|
# 获取新建联系人ID
|
||||||
|
print(f'开始请求新建联系人ID:{formatted2_previous_hour(0)}')
|
||||||
|
objs = fetch_data(token)
|
||||||
|
# 保存联系人ID
|
||||||
|
apiId = '1a11459f-5bb2-47b9-826f-9105e05f'
|
||||||
|
save_json_to_pg(objs, apiId)
|
||||||
|
print(f'结束请求联系人详情:{formatted2_previous_hour(0)}')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
|
||||||
|
DELETE FROM data_api.api_contact_fields;
|
||||||
|
|
||||||
|
insert into data_api.api_contact_fields (
|
||||||
|
data_type
|
||||||
|
, id
|
||||||
|
, is_enabled
|
||||||
|
, name
|
||||||
|
, remark
|
||||||
|
, sort
|
||||||
|
, type
|
||||||
|
,etl_tx_dt
|
||||||
|
)
|
||||||
|
select
|
||||||
|
case when trim(both from data_type)='' then null else data_type::text end data_type
|
||||||
|
, case when trim(both from id)='' then null else id::text end id
|
||||||
|
, case when trim(both from is_enabled)='' then null else is_enabled::text end is_enabled
|
||||||
|
, case when trim(both from name)='' then null else name::text end name
|
||||||
|
, case when trim(both from remark)='' then null else remark::text end remark
|
||||||
|
, case when trim(both from sort)='' then null else sort::text end sort
|
||||||
|
, case when trim(both from type)='' then null else type::text end type
|
||||||
|
,etl_tx_dt
|
||||||
|
from (
|
||||||
|
select
|
||||||
|
(json_array_elements(data::json)::json->>'dataType') data_type
|
||||||
|
, (json_array_elements(data::json)::json->>'id') id
|
||||||
|
, (json_array_elements(data::json)::json->>'isEnabled') is_enabled
|
||||||
|
, (json_array_elements(data::json)::json->>'name') name
|
||||||
|
, (json_array_elements(data::json)::json->>'remark') remark
|
||||||
|
, (json_array_elements(data::json)::json->>'sort') sort
|
||||||
|
, (json_array_elements(data::json)::json->>'type') type
|
||||||
|
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||||
|
from (select * from data_api.api_data
|
||||||
|
WHERE api_id='1a11459f-5bb2-47b9-826f-9105e05f' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||||
|
|
||||||
|
update data_api.api_data
|
||||||
|
set is_loaded = '1' ,
|
||||||
|
status = '1',
|
||||||
|
request_tm = current_timestamp(0)
|
||||||
|
where api_id='1a11459f-5bb2-47b9-826f-9105e05f';
|
||||||
|
\q
|
Loading…
Reference in New Issue