From c6f7942089264253df85d9eecdb5a2b570774db8 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 11 Sep 2025 14:30:25 +0800 Subject: [PATCH] =?UTF-8?q?add=20workflow=20=E5=B8=82=E5=9C=BA=E6=98=93API?= =?UTF-8?q?,dev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../S98_S_api_contact_created.sql | 34 ++++ .../api_contact_created/sa_foreign_tables.sql | 15 ++ .../null/api_contact_created/sa_tables.sql | 31 +++ .../S98_S_api_contact_event_meta.sql | 38 ++++ .../sa_foreign_tables.sql | 16 ++ .../null/api_contact_event_meta/sa_tables.sql | 35 ++++ .../S98_S_api_contact_events.sql | 66 +++++++ .../api_contact_events/sa_foreign_tables.sql | 23 +++ .../null/api_contact_events/sa_tables.sql | 63 ++++++ .../S98_S_api_contact_fields.sql | 50 +++++ .../api_contact_fields/sa_foreign_tables.sql | 19 ++ .../null/api_contact_fields/sa_tables.sql | 47 +++++ .../S98_S_api_contact_update_info.sql | 34 ++++ .../sa_foreign_tables.sql | 15 ++ .../api_contact_update_info/sa_tables.sql | 31 +++ .../null/市场易API/wf_dag_${wf_fold}.py | 186 ++++++++++++++++++ .../contact_event_meta_feign.py | 126 ++++++++++++ .../contact_event_meta_load.sql | 36 ++++ .../contact_create_list_feign.py | 159 +++++++++++++++ .../contact_create_list_load.sql | 33 ++++ .../contact_update_feign.py | 159 +++++++++++++++ .../contact_update_load.sql | 33 ++++ .../api_contact_events_feign.py | 74 +++++++ .../api_contact_events_load.sql | 57 ++++++ .../contact_fields_feign.py | 128 ++++++++++++ .../contact_fields_load.sql | 45 +++++ 26 files changed, 1553 insertions(+) create mode 100644 dev/workflow/TK_Cust/null/api_contact_created/S98_S_api_contact_created.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_created/sa_foreign_tables.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_created/sa_tables.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_event_meta/S98_S_api_contact_event_meta.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_event_meta/sa_foreign_tables.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_event_meta/sa_tables.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_events/S98_S_api_contact_events.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_events/sa_foreign_tables.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_events/sa_tables.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_fields/S98_S_api_contact_fields.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_fields/sa_foreign_tables.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_fields/sa_tables.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_update_info/S98_S_api_contact_update_info.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_update_info/sa_foreign_tables.sql create mode 100644 dev/workflow/TK_Cust/null/api_contact_update_info/sa_tables.sql create mode 100644 dev/workflow/TK_Cust/null/市场易API/wf_dag_${wf_fold}.py create mode 100644 dev/workflow/TK_Cust/null/获取事件元数据/contact_event_meta_feign.py create mode 100644 dev/workflow/TK_Cust/null/获取事件元数据/contact_event_meta_load.sql create mode 100644 dev/workflow/TK_Cust/null/获取新创建联系人信息/contact_create_list_feign.py create mode 100644 dev/workflow/TK_Cust/null/获取新创建联系人信息/contact_create_list_load.sql create mode 100644 dev/workflow/TK_Cust/null/获取更新联系人信息/contact_update_feign.py create mode 100644 dev/workflow/TK_Cust/null/获取更新联系人信息/contact_update_load.sql create mode 100644 dev/workflow/TK_Cust/null/获取联系人事件/api_contact_events_feign.py create mode 100644 dev/workflow/TK_Cust/null/获取联系人事件/api_contact_events_load.sql create mode 100644 dev/workflow/TK_Cust/null/获取联系人属性列表/contact_fields_feign.py create mode 100644 dev/workflow/TK_Cust/null/获取联系人属性列表/contact_fields_load.sql diff --git a/dev/workflow/TK_Cust/null/api_contact_created/S98_S_api_contact_created.sql b/dev/workflow/TK_Cust/null/api_contact_created/S98_S_api_contact_created.sql new file mode 100644 index 0000000..e55a008 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_created/S98_S_api_contact_created.sql @@ -0,0 +1,34 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on +delete from p10_sa.S98_S_api_contact_created +; +insert into p10_sa.S98_S_api_contact_created +( avatar + , id + , properties + , etl_tx_dt ) + select + avatar + , id + , properties + , etl_tx_dt + from p00_tal.S98_S_api_contact_created + ; + delete from p12_sfull.S98_S_api_contact_created +; +; +insert into p12_sfull.S98_S_api_contact_created +( avatar + , id + , properties + , etl_tx_dt ) + select + avatar + , id + , properties + , etl_tx_dt + from p10_sa.S98_S_api_contact_created +; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/null/api_contact_created/sa_foreign_tables.sql b/dev/workflow/TK_Cust/null/api_contact_created/sa_foreign_tables.sql new file mode 100644 index 0000000..04eccc3 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_created/sa_foreign_tables.sql @@ -0,0 +1,15 @@ + +CREATE FOREIGN TABLE if not exists p00_tal.S98_S_api_contact_created ( + avatar TEXT + , id TEXT + , properties TEXT + , etl_tx_dt TIMESTAMP +) + + +SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'api_contact_created' ); + + + + + diff --git a/dev/workflow/TK_Cust/null/api_contact_created/sa_tables.sql b/dev/workflow/TK_Cust/null/api_contact_created/sa_tables.sql new file mode 100644 index 0000000..abcbb9a --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_created/sa_tables.sql @@ -0,0 +1,31 @@ + +create table if not exists p10_sa.S98_S_api_contact_created ( + avatar TEXT + , id TEXT + , properties TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p10_sa.S98_S_api_contact_created.avatar IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_created.id IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_created.properties IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_created.etl_tx_dt IS ''; + +COMMENT ON TABLE p10_sa.S98_S_api_contact_created IS ''; + + + +create table if not exists p12_sfull.S98_S_api_contact_created ( + avatar TEXT + , id TEXT + , properties TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_created.avatar IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_created.id IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_created.properties IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_created.etl_tx_dt IS ''; + +COMMENT ON TABLE p12_sfull.S98_S_api_contact_created IS ''; + diff --git a/dev/workflow/TK_Cust/null/api_contact_event_meta/S98_S_api_contact_event_meta.sql b/dev/workflow/TK_Cust/null/api_contact_event_meta/S98_S_api_contact_event_meta.sql new file mode 100644 index 0000000..1ff54f2 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_event_meta/S98_S_api_contact_event_meta.sql @@ -0,0 +1,38 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on +delete from p10_sa.S98_S_api_contact_event_meta +; +insert into p10_sa.S98_S_api_contact_event_meta +( id + , name + , remark + , type + , etl_tx_dt ) + select + id + , name + , remark + , type + , etl_tx_dt + from p00_tal.S98_S_api_contact_event_meta + ; + delete from p12_sfull.S98_S_api_contact_event_meta +; +; +insert into p12_sfull.S98_S_api_contact_event_meta +( id + , name + , remark + , type + , etl_tx_dt ) + select + id + , name + , remark + , type + , etl_tx_dt + from p10_sa.S98_S_api_contact_event_meta +; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/null/api_contact_event_meta/sa_foreign_tables.sql b/dev/workflow/TK_Cust/null/api_contact_event_meta/sa_foreign_tables.sql new file mode 100644 index 0000000..794e7a5 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_event_meta/sa_foreign_tables.sql @@ -0,0 +1,16 @@ + +CREATE FOREIGN TABLE if not exists p00_tal.S98_S_api_contact_event_meta ( + id TEXT + , name TEXT + , remark TEXT + , type TEXT + , etl_tx_dt TIMESTAMP +) + + +SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'api_contact_event_meta' ); + + + + + diff --git a/dev/workflow/TK_Cust/null/api_contact_event_meta/sa_tables.sql b/dev/workflow/TK_Cust/null/api_contact_event_meta/sa_tables.sql new file mode 100644 index 0000000..c2a2e38 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_event_meta/sa_tables.sql @@ -0,0 +1,35 @@ + +create table if not exists p10_sa.S98_S_api_contact_event_meta ( + id TEXT + , name TEXT + , remark TEXT + , type TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p10_sa.S98_S_api_contact_event_meta.id IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_event_meta.name IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_event_meta.remark IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_event_meta.type IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_event_meta.etl_tx_dt IS ''; + +COMMENT ON TABLE p10_sa.S98_S_api_contact_event_meta IS ''; + + + +create table if not exists p12_sfull.S98_S_api_contact_event_meta ( + id TEXT + , name TEXT + , remark TEXT + , type TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_event_meta.id IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_event_meta.name IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_event_meta.remark IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_event_meta.type IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_event_meta.etl_tx_dt IS ''; + +COMMENT ON TABLE p12_sfull.S98_S_api_contact_event_meta IS ''; + diff --git a/dev/workflow/TK_Cust/null/api_contact_events/S98_S_api_contact_events.sql b/dev/workflow/TK_Cust/null/api_contact_events/S98_S_api_contact_events.sql new file mode 100644 index 0000000..cbf60af --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_events/S98_S_api_contact_events.sql @@ -0,0 +1,66 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on +delete from p10_sa.S98_S_api_contact_events +; +insert into p10_sa.S98_S_api_contact_events +( at + , channel_id + , channel_name + , child_events + , connect_id + , connect_name + , id + , meta_id + , meta_name + , meta_remark + , properties + , etl_tx_dt ) + select + at + , channel_id + , channel_name + , child_events + , connect_id + , connect_name + , id + , meta_id + , meta_name + , meta_remark + , properties + , etl_tx_dt + from p00_tal.S98_S_api_contact_events + ; + delete from p12_sfull.S98_S_api_contact_events +; +; +insert into p12_sfull.S98_S_api_contact_events +( at + , channel_id + , channel_name + , child_events + , connect_id + , connect_name + , id + , meta_id + , meta_name + , meta_remark + , properties + , etl_tx_dt ) + select + at + , channel_id + , channel_name + , child_events + , connect_id + , connect_name + , id + , meta_id + , meta_name + , meta_remark + , properties + , etl_tx_dt + from p10_sa.S98_S_api_contact_events +; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/null/api_contact_events/sa_foreign_tables.sql b/dev/workflow/TK_Cust/null/api_contact_events/sa_foreign_tables.sql new file mode 100644 index 0000000..0910777 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_events/sa_foreign_tables.sql @@ -0,0 +1,23 @@ + +CREATE FOREIGN TABLE if not exists p00_tal.S98_S_api_contact_events ( + at TEXT + , channel_id TEXT + , channel_name TEXT + , child_events TEXT + , connect_id TEXT + , connect_name TEXT + , id TEXT + , meta_id TEXT + , meta_name TEXT + , meta_remark TEXT + , properties TEXT + , etl_tx_dt TIMESTAMP +) + + +SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'api_contact_events' ); + + + + + diff --git a/dev/workflow/TK_Cust/null/api_contact_events/sa_tables.sql b/dev/workflow/TK_Cust/null/api_contact_events/sa_tables.sql new file mode 100644 index 0000000..152c738 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_events/sa_tables.sql @@ -0,0 +1,63 @@ + +create table if not exists p10_sa.S98_S_api_contact_events ( + at TEXT + , channel_id TEXT + , channel_name TEXT + , child_events TEXT + , connect_id TEXT + , connect_name TEXT + , id TEXT + , meta_id TEXT + , meta_name TEXT + , meta_remark TEXT + , properties TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.at IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.channel_id IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.channel_name IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.child_events IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.connect_id IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.connect_name IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.id IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.meta_id IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.meta_name IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.meta_remark IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.properties IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_events.etl_tx_dt IS ''; + +COMMENT ON TABLE p10_sa.S98_S_api_contact_events IS ''; + + + +create table if not exists p12_sfull.S98_S_api_contact_events ( + at TEXT + , channel_id TEXT + , channel_name TEXT + , child_events TEXT + , connect_id TEXT + , connect_name TEXT + , id TEXT + , meta_id TEXT + , meta_name TEXT + , meta_remark TEXT + , properties TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.at IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.channel_id IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.channel_name IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.child_events IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.connect_id IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.connect_name IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.id IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.meta_id IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.meta_name IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.meta_remark IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.properties IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_events.etl_tx_dt IS ''; + +COMMENT ON TABLE p12_sfull.S98_S_api_contact_events IS ''; + diff --git a/dev/workflow/TK_Cust/null/api_contact_fields/S98_S_api_contact_fields.sql b/dev/workflow/TK_Cust/null/api_contact_fields/S98_S_api_contact_fields.sql new file mode 100644 index 0000000..c18dd83 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_fields/S98_S_api_contact_fields.sql @@ -0,0 +1,50 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on +delete from p10_sa.S98_S_api_contact_fields +; +insert into p10_sa.S98_S_api_contact_fields +( data_type + , id + , is_enabled + , name + , remark + , sort + , type + , etl_tx_dt ) + select + data_type + , id + , is_enabled + , name + , remark + , sort + , type + , etl_tx_dt + from p00_tal.S98_S_api_contact_fields + ; + delete from p12_sfull.S98_S_api_contact_fields +; +; +insert into p12_sfull.S98_S_api_contact_fields +( data_type + , id + , is_enabled + , name + , remark + , sort + , type + , etl_tx_dt ) + select + data_type + , id + , is_enabled + , name + , remark + , sort + , type + , etl_tx_dt + from p10_sa.S98_S_api_contact_fields +; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/null/api_contact_fields/sa_foreign_tables.sql b/dev/workflow/TK_Cust/null/api_contact_fields/sa_foreign_tables.sql new file mode 100644 index 0000000..e3a2e31 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_fields/sa_foreign_tables.sql @@ -0,0 +1,19 @@ + +CREATE FOREIGN TABLE if not exists p00_tal.S98_S_api_contact_fields ( + data_type TEXT + , id TEXT + , is_enabled TEXT + , name TEXT + , remark TEXT + , sort TEXT + , type TEXT + , etl_tx_dt TIMESTAMP +) + + +SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'api_contact_fields' ); + + + + + diff --git a/dev/workflow/TK_Cust/null/api_contact_fields/sa_tables.sql b/dev/workflow/TK_Cust/null/api_contact_fields/sa_tables.sql new file mode 100644 index 0000000..cb29df3 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_fields/sa_tables.sql @@ -0,0 +1,47 @@ + +create table if not exists p10_sa.S98_S_api_contact_fields ( + data_type TEXT + , id TEXT + , is_enabled TEXT + , name TEXT + , remark TEXT + , sort TEXT + , type TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.data_type IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.id IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.is_enabled IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.name IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.remark IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.sort IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.type IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_fields.etl_tx_dt IS ''; + +COMMENT ON TABLE p10_sa.S98_S_api_contact_fields IS ''; + + + +create table if not exists p12_sfull.S98_S_api_contact_fields ( + data_type TEXT + , id TEXT + , is_enabled TEXT + , name TEXT + , remark TEXT + , sort TEXT + , type TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.data_type IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.id IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.is_enabled IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.name IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.remark IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.sort IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.type IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_fields.etl_tx_dt IS ''; + +COMMENT ON TABLE p12_sfull.S98_S_api_contact_fields IS ''; + diff --git a/dev/workflow/TK_Cust/null/api_contact_update_info/S98_S_api_contact_update_info.sql b/dev/workflow/TK_Cust/null/api_contact_update_info/S98_S_api_contact_update_info.sql new file mode 100644 index 0000000..ee0ca8e --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_update_info/S98_S_api_contact_update_info.sql @@ -0,0 +1,34 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on +delete from p10_sa.S98_S_api_contact_update_info +; +insert into p10_sa.S98_S_api_contact_update_info +( avatar + , id + , properties + , etl_tx_dt ) + select + avatar + , id + , properties + , etl_tx_dt + from p00_tal.S98_S_api_contact_update_info + ; + delete from p12_sfull.S98_S_api_contact_update_info +; +; +insert into p12_sfull.S98_S_api_contact_update_info +( avatar + , id + , properties + , etl_tx_dt ) + select + avatar + , id + , properties + , etl_tx_dt + from p10_sa.S98_S_api_contact_update_info +; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/null/api_contact_update_info/sa_foreign_tables.sql b/dev/workflow/TK_Cust/null/api_contact_update_info/sa_foreign_tables.sql new file mode 100644 index 0000000..efaf422 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_update_info/sa_foreign_tables.sql @@ -0,0 +1,15 @@ + +CREATE FOREIGN TABLE if not exists p00_tal.S98_S_api_contact_update_info ( + avatar TEXT + , id TEXT + , properties TEXT + , etl_tx_dt TIMESTAMP +) + + +SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'api_contact_update_info' ); + + + + + diff --git a/dev/workflow/TK_Cust/null/api_contact_update_info/sa_tables.sql b/dev/workflow/TK_Cust/null/api_contact_update_info/sa_tables.sql new file mode 100644 index 0000000..3866204 --- /dev/null +++ b/dev/workflow/TK_Cust/null/api_contact_update_info/sa_tables.sql @@ -0,0 +1,31 @@ + +create table if not exists p10_sa.S98_S_api_contact_update_info ( + avatar TEXT + , id TEXT + , properties TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p10_sa.S98_S_api_contact_update_info.avatar IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_update_info.id IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_update_info.properties IS ''; + COMMENT ON COLUMN p10_sa.S98_S_api_contact_update_info.etl_tx_dt IS ''; + +COMMENT ON TABLE p10_sa.S98_S_api_contact_update_info IS ''; + + + +create table if not exists p12_sfull.S98_S_api_contact_update_info ( + avatar TEXT + , id TEXT + , properties TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_update_info.avatar IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_update_info.id IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_update_info.properties IS ''; + COMMENT ON COLUMN p12_sfull.S98_S_api_contact_update_info.etl_tx_dt IS ''; + +COMMENT ON TABLE p12_sfull.S98_S_api_contact_update_info IS ''; + diff --git a/dev/workflow/TK_Cust/null/市场易API/wf_dag_${wf_fold}.py b/dev/workflow/TK_Cust/null/市场易API/wf_dag_${wf_fold}.py new file mode 100644 index 0000000..0757550 --- /dev/null +++ b/dev/workflow/TK_Cust/null/市场易API/wf_dag_${wf_fold}.py @@ -0,0 +1,186 @@ +#!/usr/bin/python +# -*- encoding=utf-8 -*- +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.contrib.operators.ssh_operator import SSHOperator +from airflow.sensors.external_task_sensor import ExternalTaskSensor +import json + +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.operators.email_operator import EmailOperator +from airflow.utils.trigger_rule import TriggerRule + + +sshHook = SSHHook(ssh_conn_id ='ssh_air') +default_args = { +'owner': 'tek_newsletter@163.com', +'email_on_failure': True, +'email_on_retry':True, +'start_date': datetime(2024, 1, 1), +'depends_on_past': False, +'retries': 6, +'retry_delay': timedelta(minutes=10), +} + +dag = DAG('wf_dag_${wf_fold}', default_args=default_args, +schedule_interval="0 1 * * *", +catchup=False, +dagrun_timeout=timedelta(minutes=160), +max_active_runs=3) + +task_failed = EmailOperator ( + dag=dag, + trigger_rule=TriggerRule.ONE_FAILED, + task_id="task_failed", + to=["tek_newsletter@163.com"], + cc=[""], + subject="${wf_fold}_failed", + html_content='

您好,${wf_fold}作业失败,请及时处理"

') + +contact_event_meta_feign = SSHOperator( +ssh_hook=sshHook, +task_id='contact_event_meta_feign', +command='python3 /data/airflow/etl/API/contact_event_meta_feign.py', +depends_on_past=False, +retries=3, +dag=dag) + +contact_event_meta_load = SSHOperator( +ssh_hook=sshHook, +task_id='contact_event_meta_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"contact_event_meta_load"}, +depends_on_past=False, +retries=3, +dag=dag) + +contact_event_meta_feign >> contact_event_meta_load + +contact_create_list_feign = SSHOperator( +ssh_hook=sshHook, +task_id='contact_create_list_feign', +command='python3 /data/airflow/etl/API/contact_create_list_feign.py', +depends_on_past=False, +retries=3, +dag=dag) + +contact_create_list_load = SSHOperator( +ssh_hook=sshHook, +task_id='contact_create_list_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"contact_create_list_load"}, +depends_on_past=False, +retries=3, +dag=dag) + +contact_create_list_feign >> contact_create_list_load + +contact_update_feign = SSHOperator( +ssh_hook=sshHook, +task_id='contact_update_feign', +command='python3 /data/airflow/etl/API/contact_update_feign.py', +depends_on_past=False, +retries=3, +dag=dag) + +contact_update_load = SSHOperator( +ssh_hook=sshHook, +task_id='contact_update_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"contact_update_load"}, +depends_on_past=False, +retries=3, +dag=dag) + +contact_update_feign >> contact_update_load + +api_contact_events_feign = SSHOperator( +ssh_hook=sshHook, +task_id='api_contact_events_feign', +command='python3 /data/airflow/etl/API/api_contact_events_feign.py', +depends_on_past=False, +retries=3, +dag=dag) + +api_contact_events_load = SSHOperator( +ssh_hook=sshHook, +task_id='api_contact_events_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"api_contact_events_load"}, +depends_on_past=False, +retries=3, +dag=dag) + +api_contact_events_feign >> api_contact_events_load + +contact_fields_feign = SSHOperator( +ssh_hook=sshHook, +task_id='contact_fields_feign', +command='python3 /data/airflow/etl/API/contact_fields_feign.py', +depends_on_past=False, +retries=3, +dag=dag) + +contact_fields_load = SSHOperator( +ssh_hook=sshHook, +task_id='contact_fields_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"contact_fields_load"}, +depends_on_past=False, +retries=3, +dag=dag) + +contact_fields_feign >> contact_fields_load + +api_contact_created_3850 = SSHOperator( +ssh_hook=sshHook, +task_id='api_contact_created_3850', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_api_contact_created"}, +depends_on_past=False, +retries=3, +dag=dag) + +api_contact_events_6043 = SSHOperator( +ssh_hook=sshHook, +task_id='api_contact_events_6043', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_api_contact_events"}, +depends_on_past=False, +retries=3, +dag=dag) + +api_contact_fields_4587 = SSHOperator( +ssh_hook=sshHook, +task_id='api_contact_fields_4587', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_api_contact_fields"}, +depends_on_past=False, +retries=3, +dag=dag) + +api_contact_update_info_2218 = SSHOperator( +ssh_hook=sshHook, +task_id='api_contact_update_info_2218', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_api_contact_update_info"}, +depends_on_past=False, +retries=3, +dag=dag) + +api_contact_event_meta_9651 = SSHOperator( +ssh_hook=sshHook, +task_id='api_contact_event_meta_9651', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_api_contact_event_meta"}, +depends_on_past=False, +retries=3, +dag=dag) + +contact_create_list_load >> api_contact_created_3850 +contact_event_meta_load >> api_contact_event_meta_9651 +api_contact_events_load >> api_contact_events_6043 +contact_update_load >> api_contact_update_info_2218 +contact_fields_load >> api_contact_fields_4587 +api_contact_fields_4587 >> task_failed diff --git a/dev/workflow/TK_Cust/null/获取事件元数据/contact_event_meta_feign.py b/dev/workflow/TK_Cust/null/获取事件元数据/contact_event_meta_feign.py new file mode 100644 index 0000000..e4d302e --- /dev/null +++ b/dev/workflow/TK_Cust/null/获取事件元数据/contact_event_meta_feign.py @@ -0,0 +1,126 @@ +# coding: utf-8 +import requests +import json +import datetime as dt +import psycopg2 +import uuid + +""" +获取指定时间段前的时间 +:param h: 时间段 +:return: 时间 +""" +def formatted2_previous_hour(h): + if h==0: + return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h) + return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S") + +""" +获取token +:return: token +""" +def get_token(): + url = "https://open.cloud.custouch.com/platform/cdp/token" + token_payload = { + 'grant_type':'client_credentials', + 'app_key':'e9b240eb3a9848e89a96c5e7857794da', + 'app_secret':'f8cb7069e7dd468888e360bf8c259fc6', + 'scope':'openid' + } + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + response = requests.request("POST", url, headers=headers, data=token_payload) + if response.status_code != 200: + raise Exception("获取token失败") + + res = json.loads(response.text) + return res['access_token'] + +""" +获取API数据 +:param token: token +:return: contact_ids +""" +def fetch_data(token): + url = "https://open.cloud.custouch.com/platform/cdp/event_meta" + params = {} + headers = { + 'Authorization': f'Bearer {token}' + } + + response = requests.request("GET", url, headers=headers, params=params) + if response.status_code != 200: + raise Exception("获取新建联系人ID失败") + + res = json.loads(response.text) + return res + + +PG_DSN = dict(database="dataops_db", + user ="dbuser_dba", + password="EmBRxnmmjnE3", + host ="124.221.232.219", + port ="5432") + + +def save_json_to_pg(data: list, api_id: str) -> None: + """把列表落库:先软删历史,再插入新批次""" + print('[save_to_pg] 写入 PG...') + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute(sql, + (api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data))) + conn.commit() + cur.close() + print('[save_to_pg] 写入完成') + except psycopg2.Error as e: + print(f'[save_to_pg] 数据库错误: {e}') + raise + except Exception as e: + print(f'[save_to_pg] 未知错误: {e}') + raise + finally: + if 'conn' in locals(): + conn.close() + + + +def main() -> None: + """主流程""" + # print(get_token()) + print(f'开始请求新建联系人信息:{formatted2_previous_hour(0)}') + token = get_token() + # print(token) + # 获取新建联系人ID + print(f'开始请求新建联系人ID:{formatted2_previous_hour(0)}') + objs = fetch_data(token) + # 保存联系人ID + apiId = 'ae05ffcd-50bb-4b4e-8c2f-a7ddca84' + save_json_to_pg(objs, apiId) + print(f'结束请求联系人详情:{formatted2_previous_hour(0)}') + + +if __name__ == '__main__': + main() + diff --git a/dev/workflow/TK_Cust/null/获取事件元数据/contact_event_meta_load.sql b/dev/workflow/TK_Cust/null/获取事件元数据/contact_event_meta_load.sql new file mode 100644 index 0000000..b62acb4 --- /dev/null +++ b/dev/workflow/TK_Cust/null/获取事件元数据/contact_event_meta_load.sql @@ -0,0 +1,36 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.api_contact_event_meta; + +insert into data_api.api_contact_event_meta ( + id + , name + , remark + , type + ,etl_tx_dt +) +select + case when trim(both from id)='' then null else id::text end id + , case when trim(both from name)='' then null else name::text end name + , case when trim(both from remark)='' then null else remark::text end remark + , case when trim(both from type)='' then null else type::text end type +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'id') id + , (json_array_elements(data::json)::json->>'name') name + , (json_array_elements(data::json)::json->>'remark') remark + , (json_array_elements(data::json)::json->>'type') type + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='ae05ffcd-50bb-4b4e-8c2f-a7ddca84' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='ae05ffcd-50bb-4b4e-8c2f-a7ddca84'; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/null/获取新创建联系人信息/contact_create_list_feign.py b/dev/workflow/TK_Cust/null/获取新创建联系人信息/contact_create_list_feign.py new file mode 100644 index 0000000..13527a9 --- /dev/null +++ b/dev/workflow/TK_Cust/null/获取新创建联系人信息/contact_create_list_feign.py @@ -0,0 +1,159 @@ +# coding: utf-8 +import requests +import json +import datetime as dt +import psycopg2 +import uuid + +""" +获取指定时间段前的时间 +:param h: 时间段 +:return: 时间 +""" +def formatted2_previous_hour(h): + if h==0: + return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h) + return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S") + +""" +获取token +:return: token +""" +def get_token(): + url = "https://open.cloud.custouch.com/platform/cdp/token" + token_payload = { + 'grant_type':'client_credentials', + 'app_key':'e9b240eb3a9848e89a96c5e7857794da', + 'app_secret':'f8cb7069e7dd468888e360bf8c259fc6', + 'scope':'openid' + } + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + response = requests.request("POST", url, headers=headers, data=token_payload) + if response.status_code != 200: + raise Exception("获取token失败") + + res = json.loads(response.text) + return res['access_token'] + +""" +获取联系人id +:param token: token +:return: contact_ids +""" +def get_contact_ids(token): + url = "https://open.cloud.custouch.com/platform/cdp/contact/batch" + params = { + 'start': formatted2_previous_hour(72), + 'end': formatted2_previous_hour(0), + 'by':'CreatedAt' + } + headers = { + 'Authorization': f'Bearer {token}' + } + + response = requests.request("GET", url, headers=headers, params=params) + if response.status_code != 200: + raise Exception("获取新建联系人ID失败") + + res = json.loads(response.text) + return res['model'] + +""" +获取联系人详情 +:param token: token +:param ids: 联系人id +:return: contact_detail +""" +def get_contact_detail(token, ids): + url = "https://open.cloud.custouch.com/platform/cdp/contact/batch/properties" + payload = json.dumps(ids) + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {token}' + } + + response = requests.request("POST", url, headers=headers, data=payload) + + if response.status_code != 200: + raise Exception("获取新建联系人详情失败") + + res = json.loads(response.text) + return res + + + + +PG_DSN = dict(database="dataops_db", + user ="dbuser_dba", + password="EmBRxnmmjnE3", + host ="124.221.232.219", + port ="5432") + + +def save_json_to_pg(data: list, api_id: str) -> None: + """把列表落库:先软删历史,再插入新批次""" + print('[save_to_pg] 写入 PG...') + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute(sql, + (api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data))) + conn.commit() + cur.close() + print('[save_to_pg] 写入完成') + except psycopg2.Error as e: + print(f'[save_to_pg] 数据库错误: {e}') + raise + except Exception as e: + print(f'[save_to_pg] 未知错误: {e}') + raise + finally: + if 'conn' in locals(): + conn.close() + +def main() -> None: + """主流程""" + # print(get_token() + print(f'开始请求新建联系人信息:{formatted2_previous_hour(0)}') + token = get_token() + # print(token) + # 获取新建联系人ID + print(f'开始请求新建联系人ID:{formatted2_previous_hour(0)}') + ids = get_contact_ids(token) + # 保存联系人ID + apiId_ids = 'c54b8459-77ae-4ab2-8e34-01897c2d' + save_json_to_pg(ids, apiId_ids) + # print(ids) + # 获取联系人详情 + print(f'开始请求联系人详情:{formatted2_previous_hour(0)}') + objs = get_contact_detail(token, ids) + # 保存联系人详情 + apiId = 'ef261acf-92e8-414f-ae92-c0764164' + save_json_to_pg(objs, apiId) + print(f'结束请求联系人详情:{formatted2_previous_hour(0)}') + + +if __name__ == '__main__': + main() + diff --git a/dev/workflow/TK_Cust/null/获取新创建联系人信息/contact_create_list_load.sql b/dev/workflow/TK_Cust/null/获取新创建联系人信息/contact_create_list_load.sql new file mode 100644 index 0000000..4532e74 --- /dev/null +++ b/dev/workflow/TK_Cust/null/获取新创建联系人信息/contact_create_list_load.sql @@ -0,0 +1,33 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.api_contact_created; + +insert into data_api.api_contact_created ( + avatar + , id + , properties + ,etl_tx_dt +) +select + case when trim(both from avatar)='' then null else avatar::text end avatar + , case when trim(both from id)='' then null else id::text end id + , case when trim(both from properties)='' then null else properties::text end properties +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'avatar') avatar + , (json_array_elements(data::json)::json->>'id') id + , (json_array_elements(data::json)::json->>'properties') properties + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='ef261acf-92e8-414f-ae92-c0764164' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='ef261acf-92e8-414f-ae92-c0764164'; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/null/获取更新联系人信息/contact_update_feign.py b/dev/workflow/TK_Cust/null/获取更新联系人信息/contact_update_feign.py new file mode 100644 index 0000000..79975c3 --- /dev/null +++ b/dev/workflow/TK_Cust/null/获取更新联系人信息/contact_update_feign.py @@ -0,0 +1,159 @@ +# coding: utf-8 +import requests +import json +import datetime as dt +import psycopg2 +import uuid + +""" +获取指定时间段前的时间 +:param h: 时间段 +:return: 时间 +""" +def formatted2_previous_hour(h): + if h==0: + return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h) + return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S") + +""" +获取token +:return: token +""" +def get_token(): + url = "https://open.cloud.custouch.com/platform/cdp/token" + token_payload = { + 'grant_type':'client_credentials', + 'app_key':'e9b240eb3a9848e89a96c5e7857794da', + 'app_secret':'f8cb7069e7dd468888e360bf8c259fc6', + 'scope':'openid' + } + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + response = requests.request("POST", url, headers=headers, data=token_payload) + if response.status_code != 200: + raise Exception("获取token失败") + + res = json.loads(response.text) + return res['access_token'] + +""" +获取联系人id +:param token: token +:return: contact_ids +""" +def get_contact_ids(token): + url = "https://open.cloud.custouch.com/platform/cdp/contact/batch" + params = { + 'start': formatted2_previous_hour(72), + 'end': formatted2_previous_hour(0), + 'by':'UpdatedAt', + } + headers = { + 'Authorization': f'Bearer {token}' + } + + response = requests.request("GET", url, headers=headers, params=params) + if response.status_code != 200: + raise Exception("获取新建联系人ID失败") + + res = json.loads(response.text) + return res['model'] + +""" +获取联系人详情 +:param token: token +:param ids: 联系人id +:return: contact_detail +""" +def get_contact_detail(token, ids): + url = "https://open.cloud.custouch.com/platform/cdp/contact/batch/properties" + payload = json.dumps(ids) + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {token}' + } + + response = requests.request("POST", url, headers=headers, data=payload) + + if response.status_code != 200: + raise Exception("获取新建联系人详情失败") + + res = json.loads(response.text) + return res + + + + +PG_DSN = dict(database="dataops_db", + user ="dbuser_dba", + password="EmBRxnmmjnE3", + host ="124.221.232.219", + port ="5432") + + +def save_json_to_pg(data: list, api_id: str) -> None: + """把列表落库:先软删历史,再插入新批次""" + print('[save_to_pg] 写入 PG...') + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute(sql, + (api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data))) + conn.commit() + cur.close() + print('[save_to_pg] 写入完成') + except psycopg2.Error as e: + print(f'[save_to_pg] 数据库错误: {e}') + raise + except Exception as e: + print(f'[save_to_pg] 未知错误: {e}') + raise + finally: + if 'conn' in locals(): + conn.close() + +def main() -> None: + """主流程""" + # print(get_token() + print(f'开始请求新建联系人信息:{formatted2_previous_hour(0)}') + token = get_token() + # print(token) + # 获取新建联系人ID + print(f'开始请求新建联系人ID:{formatted2_previous_hour(0)}') + ids = get_contact_ids(token) + # 保存联系人ID + apiId_ids = 'e8071e23-0ced-4ba9-8cac-fd7828df' + save_json_to_pg(ids, apiId_ids) + # print(ids) + # 获取联系人详情 + print(f'开始请求联系人详情:{formatted2_previous_hour(0)}') + objs = get_contact_detail(token, ids) + # 保存联系人详情 + apiId = 'ff6f11b4-d49c-47d8-b870-7082e438' + save_json_to_pg(objs, apiId) + print(f'结束请求联系人详情:{formatted2_previous_hour(0)}') + + +if __name__ == '__main__': + main() + diff --git a/dev/workflow/TK_Cust/null/获取更新联系人信息/contact_update_load.sql b/dev/workflow/TK_Cust/null/获取更新联系人信息/contact_update_load.sql new file mode 100644 index 0000000..629b6f4 --- /dev/null +++ b/dev/workflow/TK_Cust/null/获取更新联系人信息/contact_update_load.sql @@ -0,0 +1,33 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.api_contact_update_info; + +insert into data_api.api_contact_update_info ( + avatar + , id + , properties + ,etl_tx_dt +) +select + case when trim(both from avatar)='' then null else avatar::text end avatar + , case when trim(both from id)='' then null else id::text end id + , case when trim(both from properties)='' then null else properties::text end properties +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'avatar') avatar + , (json_array_elements(data::json)::json->>'id') id + , (json_array_elements(data::json)::json->>'properties') properties + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='ff6f11b4-d49c-47d8-b870-7082e438' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='ff6f11b4-d49c-47d8-b870-7082e438'; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/null/获取联系人事件/api_contact_events_feign.py b/dev/workflow/TK_Cust/null/获取联系人事件/api_contact_events_feign.py new file mode 100644 index 0000000..5319643 --- /dev/null +++ b/dev/workflow/TK_Cust/null/获取联系人事件/api_contact_events_feign.py @@ -0,0 +1,74 @@ +# coding: utf-8 +import requests +import json +import psycopg2 +import uuid +import datetime +import time +import hashlib +import time + +#荟聚 + +#全局变量,便于参数使用的预设值 +current_date = datetime.date.today() # 获取当前日期 +previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期 +formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化 +formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化 +timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数; +sign_version = 'v2' # 签名版本号,固定值v2 +nonce = str(uuid.uuid4()) + +#获取签名令牌 +def sign_data(email, open_api_token, timestamp, nonce, sign_version): + # 按照指定的格式拼接字符串 + data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}" + # 使用SHA256算法计算哈希值 + sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest() + return sha256_hash + +#获取鉴权token +def get_token(url): + #请求鉴权接口 + authRequest=requests.get(url) + #解析结果 + if not authRequest: #若为空时,返回空 + return + auth=json.loads(authRequest.text) + return auth + +print('开始加载数据:api_contact_events:获取联系人事件') +authUrl='https://open.cloud.custouch.com/platform/cdp/token' + +print('开始请求令牌。') +#authRequest=requests.get(authUrl) +#auth=json.loads(authRequest.text) +auth = get_token(authUrl) +#循环判断auth是否为空,若为空,等待30s后重新请求 +i = 0 +while 'error' in auth and i < 60: + time.sleep(60) + auth = get_token(authUrl) + i = i + 1 +print('开始请求数据总数。') +url='https://open.cloud.custouch.com/platform/cdp/contact/event' +header={} +body={'app_secret':auth['app_secret'],'app_key':auth['app_key'],'contactId':'',} +dataReqL=requests.get(url,headers=header,params=body) +resL=json.loads(dataReqL.text) +# print(resL) +dataList=resL['data'] +total=len(dataList) +conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA", + host="172.17.0.8", port="5432") +print('数据库连接成功') +dataId=str(uuid.uuid4()) +print('临时id:'+dataId) +json_object = json.dumps(dataList) +cur=conn.cursor() +sql="update data_api.api_data set is_loaded = '1' where api_id = 'a7757b4a-7038-40ef-b11e-81a2c5e0';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')" +cur.execute(sql,[dataId,'a7757b4a-7038-40ef-b11e-81a2c5e0',json_object,total]) +conn.commit() +cur.close() +conn.close() +print('加载数据结束:api_contact_events:获取联系人事件') \ No newline at end of file diff --git a/dev/workflow/TK_Cust/null/获取联系人事件/api_contact_events_load.sql b/dev/workflow/TK_Cust/null/获取联系人事件/api_contact_events_load.sql new file mode 100644 index 0000000..462e8bc --- /dev/null +++ b/dev/workflow/TK_Cust/null/获取联系人事件/api_contact_events_load.sql @@ -0,0 +1,57 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.api_contact_events; + +insert into data_api.api_contact_events ( + at + , channel_id + , channel_name + , child_events + , connect_id + , connect_name + , id + , meta_id + , meta_name + , meta_remark + , properties + ,etl_tx_dt +) +select + case when trim(both from at)='' then null else at::text end at + , case when trim(both from channel_id)='' then null else channel_id::text end channel_id + , case when trim(both from channel_name)='' then null else channel_name::text end channel_name + , case when trim(both from child_events)='' then null else child_events::text end child_events + , case when trim(both from connect_id)='' then null else connect_id::text end connect_id + , case when trim(both from connect_name)='' then null else connect_name::text end connect_name + , case when trim(both from id)='' then null else id::text end id + , case when trim(both from meta_id)='' then null else meta_id::text end meta_id + , case when trim(both from meta_name)='' then null else meta_name::text end meta_name + , case when trim(both from meta_remark)='' then null else meta_remark::text end meta_remark + , case when trim(both from properties)='' then null else properties::text end properties +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'at') at + , (json_array_elements(data::json)::json->>'channelId') channel_id + , (json_array_elements(data::json)::json->>'channelName') channel_name + , (json_array_elements(data::json)::json->>'childEvents') child_events + , (json_array_elements(data::json)::json->>'connectId') connect_id + , (json_array_elements(data::json)::json->>'connectName') connect_name + , (json_array_elements(data::json)::json->>'id') id + , (json_array_elements(data::json)::json->>'metaId') meta_id + , (json_array_elements(data::json)::json->>'metaName') meta_name + , (json_array_elements(data::json)::json->>'metaRemark') meta_remark + , (json_array_elements(data::json)::json->>'properties') properties + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='a7757b4a-7038-40ef-b11e-81a2c5e0' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='a7757b4a-7038-40ef-b11e-81a2c5e0'; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/null/获取联系人属性列表/contact_fields_feign.py b/dev/workflow/TK_Cust/null/获取联系人属性列表/contact_fields_feign.py new file mode 100644 index 0000000..0fd5a0e --- /dev/null +++ b/dev/workflow/TK_Cust/null/获取联系人属性列表/contact_fields_feign.py @@ -0,0 +1,128 @@ +# coding: utf-8 +import requests +import json +import datetime as dt +import psycopg2 +import uuid + +""" +获取指定时间段前的时间 +:param h: 时间段 +:return: 时间 +""" +def formatted2_previous_hour(h): + if h==0: + return dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + start_of_current_hour = dt.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - dt.timedelta(hours=h) + return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S") + +""" +获取token +:return: token +""" +def get_token(): + url = "https://open.cloud.custouch.com/platform/cdp/token" + token_payload = { + 'grant_type':'client_credentials', + 'app_key':'e9b240eb3a9848e89a96c5e7857794da', + 'app_secret':'f8cb7069e7dd468888e360bf8c259fc6', + 'scope':'openid' + } + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + response = requests.request("POST", url, headers=headers, data=token_payload) + if response.status_code != 200: + raise Exception("获取token失败") + + res = json.loads(response.text) + return res['access_token'] + +""" +获取API数据 +:param token: token +:return: contact_ids +""" +def fetch_data(token): + url = "https://open.cloud.custouch.com/platform/cdp/contact/field" + params = { + 'isIgnoreSurname':'false', + } + headers = { + 'Authorization': f'Bearer {token}' + } + + response = requests.request("GET", url, headers=headers, params=params) + if response.status_code != 200: + raise Exception("获取新建联系人ID失败") + + res = json.loads(response.text) + return res + + +PG_DSN = dict(database="dataops_db", + user ="dbuser_dba", + password="EmBRxnmmjnE3", + host ="124.221.232.219", + port ="5432") + + +def save_json_to_pg(data: list, api_id: str) -> None: + """把列表落库:先软删历史,再插入新批次""" + print('[save_to_pg] 写入 PG...') + sql = """ + UPDATE data_api.api_data + SET is_loaded = '1' + WHERE api_id = %s; + + INSERT INTO data_api.api_data + (id, api_id, data, total_num, is_loaded, status, + request_tm, execute_tm, remark) + VALUES (%s, %s, %s, %s, '0', '0', + current_timestamp(0), current_timestamp(0), ''); + """ + + try: + with psycopg2.connect(**PG_DSN) as conn: + with conn.cursor() as cur: + cur.execute(sql, + (api_id, + str(uuid.uuid4()), + api_id, + json.dumps(data, ensure_ascii=False), + len(data))) + conn.commit() + cur.close() + print('[save_to_pg] 写入完成') + except psycopg2.Error as e: + print(f'[save_to_pg] 数据库错误: {e}') + raise + except Exception as e: + print(f'[save_to_pg] 未知错误: {e}') + raise + finally: + if 'conn' in locals(): + conn.close() + + + +def main() -> None: + """主流程""" + # print(get_token()) + print(f'开始请求新建联系人信息:{formatted2_previous_hour(0)}') + token = get_token() + # print(token) + # 获取新建联系人ID + print(f'开始请求新建联系人ID:{formatted2_previous_hour(0)}') + objs = fetch_data(token) + # 保存联系人ID + apiId = '1a11459f-5bb2-47b9-826f-9105e05f' + save_json_to_pg(objs, apiId) + print(f'结束请求联系人详情:{formatted2_previous_hour(0)}') + + +if __name__ == '__main__': + main() + diff --git a/dev/workflow/TK_Cust/null/获取联系人属性列表/contact_fields_load.sql b/dev/workflow/TK_Cust/null/获取联系人属性列表/contact_fields_load.sql new file mode 100644 index 0000000..3f3dab8 --- /dev/null +++ b/dev/workflow/TK_Cust/null/获取联系人属性列表/contact_fields_load.sql @@ -0,0 +1,45 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.api_contact_fields; + +insert into data_api.api_contact_fields ( + data_type + , id + , is_enabled + , name + , remark + , sort + , type + ,etl_tx_dt +) +select + case when trim(both from data_type)='' then null else data_type::text end data_type + , case when trim(both from id)='' then null else id::text end id + , case when trim(both from is_enabled)='' then null else is_enabled::text end is_enabled + , case when trim(both from name)='' then null else name::text end name + , case when trim(both from remark)='' then null else remark::text end remark + , case when trim(both from sort)='' then null else sort::text end sort + , case when trim(both from type)='' then null else type::text end type +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'dataType') data_type + , (json_array_elements(data::json)::json->>'id') id + , (json_array_elements(data::json)::json->>'isEnabled') is_enabled + , (json_array_elements(data::json)::json->>'name') name + , (json_array_elements(data::json)::json->>'remark') remark + , (json_array_elements(data::json)::json->>'sort') sort + , (json_array_elements(data::json)::json->>'type') type + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='1a11459f-5bb2-47b9-826f-9105e05f' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='1a11459f-5bb2-47b9-826f-9105e05f'; +\q \ No newline at end of file