diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/S98_S_tr_custom_details.sql b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/S98_S_tr_custom_details.sql new file mode 100644 index 0000000..77efd5b --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/S98_S_tr_custom_details.sql @@ -0,0 +1,174 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on +delete from p10_sa.S98_S_tr_custom_details +; +insert into p10_sa.S98_S_tr_custom_details +( id + , name + , sex + , tel + , email + , address + , level + , share_type + , share + , remark + , source + , creator_type + , creator_id + , modifier_type + , modifier_id + , last_contact_time + , last_contact_type + , customize + , create_time + , update_time + , visitor_ids + , creator_name + , modifier_name + , ib_bridged_number + , ob_bridged_number + , ob_number + , assign_time + , external_id + , ib_number + , retrieve + , retrieve_time + , queue_without_attribution + , phase_id + , phase_reason_id + , promote_source + , repeat_promote_count + , last_repeat_promote_time + , label_ids + , etl_tx_dt ) + select + id + , name + , sex + , tel + , email + , address + , level + , share_type + , share + , remark + , source + , creator_type + , creator_id + , modifier_type + , modifier_id + , last_contact_time + , last_contact_type + , customize + , create_time + , update_time + , visitor_ids + , creator_name + , modifier_name + , ib_bridged_number + , ob_bridged_number + , ob_number + , assign_time + , external_id + , ib_number + , retrieve + , retrieve_time + , queue_without_attribution + , phase_id + , phase_reason_id + , promote_source + , repeat_promote_count + , last_repeat_promote_time + , label_ids + , etl_tx_dt + from p00_tal.S98_S_tr_custom_details + ; + delete from p12_sfull.S98_S_tr_custom_details +; +; +insert into p12_sfull.S98_S_tr_custom_details +( id + , name + , sex + , tel + , email + , address + , level + , share_type + , share + , remark + , source + , creator_type + , creator_id + , modifier_type + , modifier_id + , last_contact_time + , last_contact_type + , customize + , create_time + , update_time + , visitor_ids + , creator_name + , modifier_name + , ib_bridged_number + , ob_bridged_number + , ob_number + , assign_time + , external_id + , ib_number + , retrieve + , retrieve_time + , queue_without_attribution + , phase_id + , phase_reason_id + , promote_source + , repeat_promote_count + , last_repeat_promote_time + , label_ids + , etl_tx_dt ) + select + id + , name + , sex + , tel + , email + , address + , level + , share_type + , share + , remark + , source + , creator_type + , creator_id + , modifier_type + , modifier_id + , last_contact_time + , last_contact_type + , customize + , create_time + , update_time + , visitor_ids + , creator_name + , modifier_name + , ib_bridged_number + , ob_bridged_number + , ob_number + , assign_time + , external_id + , ib_number + , retrieve + , retrieve_time + , queue_without_attribution + , phase_id + , phase_reason_id + , promote_source + , repeat_promote_count + , last_repeat_promote_time + , label_ids + , etl_tx_dt + from p10_sa.S98_S_tr_custom_details +; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/sa_foreign_tables.sql b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/sa_foreign_tables.sql new file mode 100644 index 0000000..2ecd78c --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/sa_foreign_tables.sql @@ -0,0 +1,50 @@ + +CREATE FOREIGN TABLE if not exists p00_tal.S98_S_tr_custom_details ( + id TEXT + , name TEXT + , sex TEXT + , tel TEXT + , email TEXT + , address TEXT + , level TEXT + , share_type TEXT + , share TEXT + , remark TEXT + , source TEXT + , creator_type TEXT + , creator_id TEXT + , modifier_type TEXT + , modifier_id TEXT + , last_contact_time TEXT + , last_contact_type TEXT + , customize TEXT + , create_time TEXT + , update_time TEXT + , visitor_ids TEXT + , creator_name TEXT + , modifier_name TEXT + , ib_bridged_number TEXT + , ob_bridged_number TEXT + , ob_number TEXT + , assign_time TEXT + , external_id TEXT + , ib_number TEXT + , retrieve TEXT + , retrieve_time TEXT + , queue_without_attribution TEXT + , phase_id TEXT + , phase_reason_id TEXT + , promote_source TEXT + , repeat_promote_count TEXT + , last_repeat_promote_time TEXT + , label_ids TEXT + , etl_tx_dt TIMESTAMP +) + + +SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'tr_custom_details' ); + + + + + diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/sa_tables.sql b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/sa_tables.sql new file mode 100644 index 0000000..aa2c811 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_details/sa_tables.sql @@ -0,0 +1,171 @@ + +create table if not exists p10_sa.S98_S_tr_custom_details ( + id TEXT + , name TEXT + , sex TEXT + , tel TEXT + , email TEXT + , address TEXT + , level TEXT + , share_type TEXT + , share TEXT + , remark TEXT + , source TEXT + , creator_type TEXT + , creator_id TEXT + , modifier_type TEXT + , modifier_id TEXT + , last_contact_time TEXT + , last_contact_type TEXT + , customize TEXT + , create_time TEXT + , update_time TEXT + , visitor_ids TEXT + , creator_name TEXT + , modifier_name TEXT + , ib_bridged_number TEXT + , ob_bridged_number TEXT + , ob_number TEXT + , assign_time TEXT + , external_id TEXT + , ib_number TEXT + , retrieve TEXT + , retrieve_time TEXT + , queue_without_attribution TEXT + , phase_id TEXT + , phase_reason_id TEXT + , promote_source TEXT + , repeat_promote_count TEXT + , last_repeat_promote_time TEXT + , label_ids TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.id IS '客户资料id'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.name IS '客户名称'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.sex IS '客户性别'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.tel IS '客户号码'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.email IS '邮箱'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.address IS '地址'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.level IS '客户等级'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.share_type IS '归属类型,0:全体共享、1:员工组共享、2:员工私有、3:无归属'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.share IS '客户归属'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.remark IS '备注'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.source IS '客户来源'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.creator_type IS '创建人类型'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.creator_id IS '创建人id,-1:openApi'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.modifier_type IS '更新人类型'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.modifier_id IS '更新人id,-1:openApi'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.last_contact_time IS '最后一次联系时间'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.last_contact_type IS '最后一次联系类型'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.customize IS '该对象中,id为自定义字段id, name为自定义字段名称, value为自定义字段值'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.create_time IS '创建时间'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.update_time IS '更新时间'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.visitor_ids IS '访客ids'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.creator_name IS '创建人名称'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.modifier_name IS '更新人名称'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.ib_bridged_number IS '呼入接通次数'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.ob_bridged_number IS '呼出接通次数'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.ob_number IS '呼出次数'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.assign_time IS '分配时间'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.external_id IS '外部企业客户ID (第三方平台 ID)'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.ib_number IS '呼入次数'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.retrieve IS '是否为回收客户'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.retrieve_time IS '回收时间'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.queue_without_attribution IS '无归属授权员工组'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.phase_id IS '客户阶段id'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.phase_reason_id IS '阶段原因id'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.promote_source IS '推广来源'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.repeat_promote_count IS '重复推广次数'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.last_repeat_promote_time IS '最近一次重复推广时间'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.label_ids IS '客户标签id'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.etl_tx_dt IS ''; + +COMMENT ON TABLE p10_sa.S98_S_tr_custom_details IS '客户资料详情'; + + + +create table if not exists p12_sfull.S98_S_tr_custom_details ( + id TEXT + , name TEXT + , sex TEXT + , tel TEXT + , email TEXT + , address TEXT + , level TEXT + , share_type TEXT + , share TEXT + , remark TEXT + , source TEXT + , creator_type TEXT + , creator_id TEXT + , modifier_type TEXT + , modifier_id TEXT + , last_contact_time TEXT + , last_contact_type TEXT + , customize TEXT + , create_time TEXT + , update_time TEXT + , visitor_ids TEXT + , creator_name TEXT + , modifier_name TEXT + , ib_bridged_number TEXT + , ob_bridged_number TEXT + , ob_number TEXT + , assign_time TEXT + , external_id TEXT + , ib_number TEXT + , retrieve TEXT + , retrieve_time TEXT + , queue_without_attribution TEXT + , phase_id TEXT + , phase_reason_id TEXT + , promote_source TEXT + , repeat_promote_count TEXT + , last_repeat_promote_time TEXT + , label_ids TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.id IS '客户资料id'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.name IS '客户名称'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.sex IS '客户性别'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.tel IS '客户号码'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.email IS '邮箱'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.address IS '地址'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.level IS '客户等级'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.share_type IS '归属类型,0:全体共享、1:员工组共享、2:员工私有、3:无归属'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.share IS '客户归属'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.remark IS '备注'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.source IS '客户来源'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.creator_type IS '创建人类型'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.creator_id IS '创建人id,-1:openApi'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.modifier_type IS '更新人类型'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.modifier_id IS '更新人id,-1:openApi'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.last_contact_time IS '最后一次联系时间'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.last_contact_type IS '最后一次联系类型'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.customize IS '该对象中,id为自定义字段id, name为自定义字段名称, value为自定义字段值'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.create_time IS '创建时间'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.update_time IS '更新时间'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.visitor_ids IS '访客ids'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.creator_name IS '创建人名称'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.modifier_name IS '更新人名称'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.ib_bridged_number IS '呼入接通次数'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.ob_bridged_number IS '呼出接通次数'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.ob_number IS '呼出次数'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.assign_time IS '分配时间'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.external_id IS '外部企业客户ID (第三方平台 ID)'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.ib_number IS '呼入次数'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.retrieve IS '是否为回收客户'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.retrieve_time IS '回收时间'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.queue_without_attribution IS '无归属授权员工组'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.phase_id IS '客户阶段id'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.phase_reason_id IS '阶段原因id'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.promote_source IS '推广来源'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.repeat_promote_count IS '重复推广次数'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.last_repeat_promote_time IS '最近一次重复推广时间'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.label_ids IS '客户标签id'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.etl_tx_dt IS ''; + +COMMENT ON TABLE p12_sfull.S98_S_tr_custom_details IS '客户资料详情'; + diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_labels/S98_S_tr_custom_labels.sql b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_labels/S98_S_tr_custom_labels.sql new file mode 100644 index 0000000..b809fcc --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_labels/S98_S_tr_custom_labels.sql @@ -0,0 +1,30 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on +delete from p10_sa.S98_S_tr_custom_labels +; +insert into p10_sa.S98_S_tr_custom_labels +( label_group_name + , customer_label_list + , etl_tx_dt ) + select + label_group_name + , customer_label_list + , etl_tx_dt + from p00_tal.S98_S_tr_custom_labels + ; + delete from p12_sfull.S98_S_tr_custom_labels +; +; +insert into p12_sfull.S98_S_tr_custom_labels +( label_group_name + , customer_label_list + , etl_tx_dt ) + select + label_group_name + , customer_label_list + , etl_tx_dt + from p10_sa.S98_S_tr_custom_labels +; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_labels/sa_foreign_tables.sql b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_labels/sa_foreign_tables.sql new file mode 100644 index 0000000..e25a54e --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_labels/sa_foreign_tables.sql @@ -0,0 +1,14 @@ + +CREATE FOREIGN TABLE if not exists p00_tal.S98_S_tr_custom_labels ( + label_group_name TEXT + , customer_label_list TEXT + , etl_tx_dt TIMESTAMP +) + + +SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'tr_custom_labels' ); + + + + + diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_labels/sa_tables.sql b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_labels/sa_tables.sql new file mode 100644 index 0000000..a4563f7 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/tr_custom_labels/sa_tables.sql @@ -0,0 +1,27 @@ + +create table if not exists p10_sa.S98_S_tr_custom_labels ( + label_group_name TEXT + , customer_label_list TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_labels.label_group_name IS '标签分组名称'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_labels.customer_label_list IS '分组下的标签对象数组'; + COMMENT ON COLUMN p10_sa.S98_S_tr_custom_labels.etl_tx_dt IS ''; + +COMMENT ON TABLE p10_sa.S98_S_tr_custom_labels IS '客户标签列表'; + + + +create table if not exists p12_sfull.S98_S_tr_custom_labels ( + label_group_name TEXT + , customer_label_list TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_labels.label_group_name IS '标签分组名称'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_labels.customer_label_list IS '分组下的标签对象数组'; + COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_labels.etl_tx_dt IS ''; + +COMMENT ON TABLE p12_sfull.S98_S_tr_custom_labels IS '客户标签列表'; + diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py b/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py new file mode 100644 index 0000000..51a4aa6 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/天润Smart-ccc客户数据/wf_dag_smart_ccc_custom.py @@ -0,0 +1,90 @@ +#!/usr/bin/python +# -*- encoding=utf-8 -*- +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.contrib.operators.ssh_operator import SSHOperator +from airflow.sensors.external_task_sensor import ExternalTaskSensor +import json + +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.operators.email_operator import EmailOperator +from airflow.utils.trigger_rule import TriggerRule + + +sshHook = SSHHook(ssh_conn_id ='ssh_air') +default_args = { +'owner': 'info@idgvalue.com', +'email_on_failure': True, +'email_on_retry':True, +'start_date': datetime(2024, 1, 1), +'depends_on_past': False, +'retries': 6, +'retry_delay': timedelta(minutes=10), +} + +dag = DAG('wf_dag_smart_ccc_custom', default_args=default_args, +schedule_interval="0 0-23/1 * * *", +catchup=False, +dagrun_timeout=timedelta(minutes=160), +max_active_runs=3) + +task_failed = EmailOperator ( + dag=dag, + trigger_rule=TriggerRule.ONE_FAILED, + task_id="task_failed", + to=["info@idgvalue.com"], + cc=[""], + subject="smart_ccc_custom_failed", + html_content='

您好,smart_ccc_custom作业失败,请及时处理"

') + +customer_list_feign = SSHOperator( +ssh_hook=sshHook, +task_id='customer_list_feign', +command='python3 /data/airflow/etl/API/customer_list_feign.py', +depends_on_past=False, +retries=3, +dag=dag) + + +customer_labels_feign = SSHOperator( +ssh_hook=sshHook, +task_id='customer_labels_feign', +command='python3 /data/airflow/etl/API/customer_labels_feign.py', +depends_on_past=False, +retries=3, +dag=dag) + +customer_labels_load = SSHOperator( +ssh_hook=sshHook, +task_id='customer_labels_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"customer_labels_load"}, +depends_on_past=False, +retries=3, +dag=dag) + +customer_labels_feign >> customer_labels_load + +tr_custom_details_5516 = SSHOperator( +ssh_hook=sshHook, +task_id='tr_custom_details_5516', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_tr_custom_details"}, +depends_on_past=False, +retries=3, +dag=dag) + +tr_custom_labels_8280 = SSHOperator( +ssh_hook=sshHook, +task_id='tr_custom_labels_8280', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_tr_custom_labels"}, +depends_on_past=False, +retries=3, +dag=dag) + +customer_labels_load >> tr_custom_labels_8280 +customer_list_feign >> tr_custom_details_5516 +tr_custom_details_5516 >> task_failed +tr_custom_labels_8280 >> task_failed diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/查询客户资料可用标签/customer_labels_feign.py b/dev/workflow/TK_Cust/smart_ccc_custom/查询客户资料可用标签/customer_labels_feign.py new file mode 100644 index 0000000..da0e642 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/查询客户资料可用标签/customer_labels_feign.py @@ -0,0 +1,123 @@ +# coding: utf-8 +import requests +import json +import psycopg2 +import uuid +import datetime +import time +import hashlib +import time +import hmac +import base64 +import urllib.parse +import hashlib +from collections import OrderedDict +from urllib.parse import quote_plus + +#全局变量,便于参数使用的预设值 +current_date = datetime.date.today() # 获取当前日期 +previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期 +formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化 +formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化 +timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数; +sign_version = 'v2' # 签名版本号,固定值v2 +nonce = str(uuid.uuid4()) +current_time_utc =( datetime.datetime.now() - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%SZ") + +formatted2_current_date = current_date.strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期 - 标准化 +formatted2_previous_date = previous_date.strftime("%Y-%m-%d %H:%M:%S") # 获取前一天日期 - 标准化 + +def formatted2_previous_hour(h): + if h==0: + return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h) + return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S") + +def previous_hour_timestamp(h): + if h==0: + return int(time.time()) + start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h) + return int(start_of_previous_hour.timestamp()) + +#计算签名 +def generate_signature(str, private_key): + signature = hmac.new(private_key.encode(), (str).encode(), hashlib.sha1) + signature_b64 = base64.b64encode(signature.digest()).decode() + return signature_b64 + +#构建查询链接 +def build_query_string(params): + # 使用OrderedDict来保持排序 + sorted_params = OrderedDict(sorted(params.items())) + + # 拼接属性名和属性值,并使用&连接 + query_string = '&'.join('{}={}'.format( + urllib.parse.quote_plus(k), + urllib.parse.quote_plus(str(v)) + ) for k, v in sorted_params.items()) + return query_string + + + +def request_data_signature_post(): + print('开始请求数据...') + url='https://api-bj.clink.cn/crm/list_customer_labels' + header={} + param={'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':Expires,} + print(f'param: {param}') + paramJson = {"Timestamp":""} + print(f'paramJson: {paramJson}') + url_path = build_query_string(param) + url_param = build_query_string(param) + print(f'url_param: {url_param}') + url_param = f'POSTapi-bj.clink.cn/crm/list_customer_labels?{url_param}' + print(f'url_param2: {url_param}') + signature= generate_signature(url_param,'5g027B6w06630Y5240c1') + print(f'signature: {signature}') + url = f'{url}?{url_path}&Signature={signature}' + print(f'url: {url}') + body={} + print(f'body: {body}') + dataReqL=requests.post(url,headers=header,params=body) + i = 0 + while 'error' in dataReqL and i < 5: + time.sleep(1) + dataReqL=requests.post(url,headers=header,params=body) + i = i + 1 + resL=json.loads(dataReqL.text) + print(dataReqL) + resL=json.loads(dataReqL.text) + return resL + + +def load_data_to_db(dataList): + conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432") + print('数据库连接成功') + dataId=str(uuid.uuid4()) + total=len(dataList) + print('临时id:'+dataId) + json_object = json.dumps(dataList) + cur=conn.cursor() + sql="update data_api.api_data set is_loaded = '1' where api_id = '02d5a32d8736457fa51d668652cc50af';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')" + cur.execute(sql,[dataId,'02d5a32d8736457fa51d668652cc50af', json_object, total]) + conn.commit() + cur.close() + conn.close() + print('加载数据结束:customer_labels:查询客户资料可用标签') + + + +if __name__ == "__main__": + + + resL = request_data_signature_post() + print(resL) + + + if 'error' in resL: + load_error_to_db(resl) + load_data_to_db(resL[''customerLabels']) \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/查询客户资料可用标签/customer_labels_load.sql b/dev/workflow/TK_Cust/smart_ccc_custom/查询客户资料可用标签/customer_labels_load.sql new file mode 100644 index 0000000..37a2209 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/查询客户资料可用标签/customer_labels_load.sql @@ -0,0 +1,30 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.tr_custom_labels; + +insert into data_api.tr_custom_labels ( + label_group_name + , customer_label_list + ,etl_tx_dt +) +select + case when trim(both from label_group_name)='' then null else label_group_name::text end label_group_name + , case when trim(both from customer_label_list)='' then null else customer_label_list::text end customer_label_list +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'labelGroupName') label_group_name + , (json_array_elements(data::json)::json->>'customerLabelList') customer_label_list + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='02d5a32d8736457fa51d668652cc50af' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='02d5a32d8736457fa51d668652cc50af'; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料信息/customer_detail_feign.py b/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料信息/customer_detail_feign.py new file mode 100644 index 0000000..ef95ad1 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料信息/customer_detail_feign.py @@ -0,0 +1,123 @@ +# coding: utf-8 +import requests +import json +import psycopg2 +import uuid +import datetime +import time +import hashlib +import time +import hmac +import base64 +import urllib.parse +import hashlib +from collections import OrderedDict +from urllib.parse import quote_plus + +#全局变量,便于参数使用的预设值 +current_date = datetime.date.today() # 获取当前日期 +previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期 +formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化 +formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化 +timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数; +sign_version = 'v2' # 签名版本号,固定值v2 +nonce = str(uuid.uuid4()) +current_time_utc =( datetime.datetime.now() - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%SZ") + +formatted2_current_date = current_date.strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期 - 标准化 +formatted2_previous_date = previous_date.strftime("%Y-%m-%d %H:%M:%S") # 获取前一天日期 - 标准化 + +def formatted2_previous_hour(h): + if h==0: + return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h) + return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S") + +def previous_hour_timestamp(h): + if h==0: + return int(time.time()) + start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h) + return int(start_of_previous_hour.timestamp()) + +#计算签名 +def generate_signature(str, private_key): + signature = hmac.new(private_key.encode(), (str).encode(), hashlib.sha1) + signature_b64 = base64.b64encode(signature.digest()).decode() + return signature_b64 + +#构建查询链接 +def build_query_string(params): + # 使用OrderedDict来保持排序 + sorted_params = OrderedDict(sorted(params.items())) + + # 拼接属性名和属性值,并使用&连接 + query_string = '&'.join('{}={}'.format( + urllib.parse.quote_plus(k), + urllib.parse.quote_plus(str(v)) + ) for k, v in sorted_params.items()) + return query_string + + + +def request_data_signature_post(): + print('开始请求数据...') + url='https://api-bj.clink.cn/crm/query_customer' + header={} + param={'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':'86400',} + print(f'param: {param}') + paramJson = {"Expires":"86400","Timestamp":""} + print(f'paramJson: {paramJson}') + url_path = build_query_string(param) + url_param = build_query_string(param) + print(f'url_param: {url_param}') + url_param = f'POSTapi-bj.clink.cn/crm/query_customer?{url_param}' + print(f'url_param2: {url_param}') + signature= generate_signature(url_param,'5g027B6w06630Y5240c1') + print(f'signature: {signature}') + url = f'{url}?{url_path}&Signature={signature}' + print(f'url: {url}') + body={'customerId':'33',} + print(f'body: {body}') + dataReqL=requests.post(url,headers=header,params=body) + i = 0 + while 'error' in dataReqL and i < 5: + time.sleep(1) + dataReqL=requests.post(url,headers=header,params=body) + i = i + 1 + resL=json.loads(dataReqL.text) + print(dataReqL) + resL=json.loads(dataReqL.text) + return resL + + +def load_data_to_db(dataList): + conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432") + print('数据库连接成功') + dataId=str(uuid.uuid4()) + total=len(dataList) + print('临时id:'+dataId) + json_object = json.dumps(dataList) + cur=conn.cursor() + sql="update data_api.api_data set is_loaded = '1' where api_id = '010d4668242c4b96b4964693edcf5556';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')" + cur.execute(sql,[dataId,'010d4668242c4b96b4964693edcf5556', json_object, total]) + conn.commit() + cur.close() + conn.close() + print('加载数据结束:customer_detail:获取客户资料信息') + + + +if __name__ == "__main__": + + + resL = request_data_signature_post() + print(resL) + + + if 'error' in resL: + load_error_to_db(resl) + load_data_to_db(resL[''customer']) \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料信息/customer_detail_load.sql b/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料信息/customer_detail_load.sql new file mode 100644 index 0000000..d80d7a5 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料信息/customer_detail_load.sql @@ -0,0 +1,138 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.tr_custom_details; + +insert into data_api.tr_custom_details ( + id + , name + , sex + , tel + , email + , address + , level + , share_type + , share + , remark + , source + , creator_type + , creator_id + , modifier_type + , modifier_id + , last_contact_time + , last_contact_type + , customize + , create_time + , update_time + , visitor_ids + , creator_name + , modifier_name + , ib_bridged_number + , ob_bridged_number + , ob_number + , assign_time + , external_id + , ib_number + , retrieve + , retrieve_time + , queue_without_attribution + , phase_id + , phase_reason_id + , promote_source + , repeat_promote_count + , last_repeat_promote_time + , label_ids + ,etl_tx_dt +) +select + case when trim(both from id)='' then null else id::text end id + , case when trim(both from name)='' then null else name::text end name + , case when trim(both from sex)='' then null else sex::text end sex + , case when trim(both from tel)='' then null else tel::text end tel + , case when trim(both from email)='' then null else email::text end email + , case when trim(both from address)='' then null else address::text end address + , case when trim(both from level)='' then null else level::text end level + , case when trim(both from share_type)='' then null else share_type::text end share_type + , case when trim(both from share)='' then null else share::text end share + , case when trim(both from remark)='' then null else remark::text end remark + , case when trim(both from source)='' then null else source::text end source + , case when trim(both from creator_type)='' then null else creator_type::text end creator_type + , case when trim(both from creator_id)='' then null else creator_id::text end creator_id + , case when trim(both from modifier_type)='' then null else modifier_type::text end modifier_type + , case when trim(both from modifier_id)='' then null else modifier_id::text end modifier_id + , case when trim(both from last_contact_time)='' then null else last_contact_time::text end last_contact_time + , case when trim(both from last_contact_type)='' then null else last_contact_type::text end last_contact_type + , case when trim(both from customize)='' then null else customize::text end customize + , case when trim(both from create_time)='' then null else create_time::text end create_time + , case when trim(both from update_time)='' then null else update_time::text end update_time + , case when trim(both from visitor_ids)='' then null else visitor_ids::text end visitor_ids + , case when trim(both from creator_name)='' then null else creator_name::text end creator_name + , case when trim(both from modifier_name)='' then null else modifier_name::text end modifier_name + , case when trim(both from ib_bridged_number)='' then null else ib_bridged_number::text end ib_bridged_number + , case when trim(both from ob_bridged_number)='' then null else ob_bridged_number::text end ob_bridged_number + , case when trim(both from ob_number)='' then null else ob_number::text end ob_number + , case when trim(both from assign_time)='' then null else assign_time::text end assign_time + , case when trim(both from external_id)='' then null else external_id::text end external_id + , case when trim(both from ib_number)='' then null else ib_number::text end ib_number + , case when trim(both from retrieve)='' then null else retrieve::text end retrieve + , case when trim(both from retrieve_time)='' then null else retrieve_time::text end retrieve_time + , case when trim(both from queue_without_attribution)='' then null else queue_without_attribution::text end queue_without_attribution + , case when trim(both from phase_id)='' then null else phase_id::text end phase_id + , case when trim(both from phase_reason_id)='' then null else phase_reason_id::text end phase_reason_id + , case when trim(both from promote_source)='' then null else promote_source::text end promote_source + , case when trim(both from repeat_promote_count)='' then null else repeat_promote_count::text end repeat_promote_count + , case when trim(both from last_repeat_promote_time)='' then null else last_repeat_promote_time::text end last_repeat_promote_time + , case when trim(both from label_ids)='' then null else label_ids::text end label_ids +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'id') id + , (json_array_elements(data::json)::json->>'name') name + , (json_array_elements(data::json)::json->>'sex') sex + , (json_array_elements(data::json)::json->>'tel') tel + , (json_array_elements(data::json)::json->>'email') email + , (json_array_elements(data::json)::json->>'address') address + , (json_array_elements(data::json)::json->>'level') level + , (json_array_elements(data::json)::json->>'shareType') share_type + , (json_array_elements(data::json)::json->>'share') share + , (json_array_elements(data::json)::json->>'remark') remark + , (json_array_elements(data::json)::json->>'source') source + , (json_array_elements(data::json)::json->>'creatorType') creator_type + , (json_array_elements(data::json)::json->>'creatorId') creator_id + , (json_array_elements(data::json)::json->>'modifierType') modifier_type + , (json_array_elements(data::json)::json->>'modifierId') modifier_id + , (json_array_elements(data::json)::json->>'lastContactTime') last_contact_time + , (json_array_elements(data::json)::json->>'lastContactType') last_contact_type + , (json_array_elements(data::json)::json->>'customize') customize + , (json_array_elements(data::json)::json->>'createTime') create_time + , (json_array_elements(data::json)::json->>'updateTime') update_time + , (json_array_elements(data::json)::json->>'visitorIds') visitor_ids + , (json_array_elements(data::json)::json->>'creatorName') creator_name + , (json_array_elements(data::json)::json->>'modifierName') modifier_name + , (json_array_elements(data::json)::json->>'ibBridgedNumber') ib_bridged_number + , (json_array_elements(data::json)::json->>'obBridgedNumber') ob_bridged_number + , (json_array_elements(data::json)::json->>'obNumber') ob_number + , (json_array_elements(data::json)::json->>'assignTime') assign_time + , (json_array_elements(data::json)::json->>'externalId') external_id + , (json_array_elements(data::json)::json->>'ibNumber') ib_number + , (json_array_elements(data::json)::json->>'retrieve') retrieve + , (json_array_elements(data::json)::json->>'retrieveTime') retrieve_time + , (json_array_elements(data::json)::json->>'queueWithoutAttribution') queue_without_attribution + , (json_array_elements(data::json)::json->>'phaseId') phase_id + , (json_array_elements(data::json)::json->>'phaseReasonId') phase_reason_id + , (json_array_elements(data::json)::json->>'promoteSource') promote_source + , (json_array_elements(data::json)::json->>'repeatPromoteCount') repeat_promote_count + , (json_array_elements(data::json)::json->>'lastRepeatPromoteTime') last_repeat_promote_time + , (json_array_elements(data::json)::json->>'labelIds') label_ids + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='010d4668242c4b96b4964693edcf5556' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='010d4668242c4b96b4964693edcf5556'; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料列表/customer_list_feign.py b/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料列表/customer_list_feign.py new file mode 100644 index 0000000..b104cdc --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料列表/customer_list_feign.py @@ -0,0 +1,221 @@ +# coding: utf-8 +import requests +import json +import psycopg2 +import uuid +import datetime +import time +import hashlib +import time +import hmac +import base64 +import urllib.parse +import hashlib +from collections import OrderedDict +from urllib.parse import quote_plus + +#全局变量,便于参数使用的预设值 +current_date = datetime.date.today() # 获取当前日期 +previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期 +formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化 +formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化 +timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数; +sign_version = 'v2' # 签名版本号,固定值v2 +nonce = str(uuid.uuid4()) +current_time_utc =( datetime.datetime.now() - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%SZ") + +formatted2_current_date = current_date.strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期 - 标准化 +formatted2_previous_date = previous_date.strftime("%Y-%m-%d %H:%M:%S") # 获取前一天日期 - 标准化 + +def formatted2_previous_hour(h): + if h==0: + return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h) + return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S") + +def previous_hour_timestamp(h): + if h==0: + return int(time.time()) + start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0) + # 减去一个小时,得到前一个小时的开始时间 + start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h) + return int(start_of_previous_hour.timestamp()) + +#计算签名 +def generate_signature(str, private_key): + signature = hmac.new(private_key.encode(), (str).encode(), hashlib.sha1) + signature_b64 = base64.b64encode(signature.digest()).decode() + return signature_b64 + +#构建查询链接 +def build_query_string(params): + # 使用OrderedDict来保持排序 + sorted_params = OrderedDict(sorted(params.items())) + + # 拼接属性名和属性值,并使用&连接 + query_string = '&'.join('{}={}'.format( + urllib.parse.quote_plus(k), + urllib.parse.quote_plus(str(v)) + ) for k, v in sorted_params.items()) + return query_string + +#构建查询链接 +def build_query_string(params): + # 使用OrderedDict来保持排序 + sorted_params = OrderedDict(sorted(params.items())) + + # 拼接属性名和属性值,并使用&连接 + query_string = '&'.join('{}={}'.format( + urllib.parse.quote_plus(k), + urllib.parse.quote_plus(str(v)) + ) for k, v in sorted_params.items()) + return query_string + + +#计算签名,get请求 +def request_list_signature_get(): + print('开始请求数据...') + url='https://api-bj.clink.cn/crm/list_customers' + # param={'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':86400,'updateStartTime':previous_hour_timestamp(1),'updateEndTime':previous_hour_timestamp(0)} + param={'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':86400,'updateStartTime':1704038400,'updateEndTime':previous_hour_timestamp(0)} + print(f'param: {param}') + url_path = build_query_string(param) + url_param = url_path + print(f'url_param: {url_param}') + url_param = f'GETapi-bj.clink.cn/crm/list_customers?{url_param}' + print(f'待计算字符串: {url_param}') + signature= generate_signature(url_param,'5g027B6w06630Y5240c1') + print(f'计算签名: {signature}') + print(f'编码后签名: {urllib.parse.quote_plus(signature)}') + url = f'{url}?{url_path}&Signature={urllib.parse.quote_plus(signature)}' + print(f'url: {url}') + + dataReqL=requests.get(url,headers={},params={}) + resText = dataReqL.text + i = 0 + while 'error' in resText and i < 5: + print(f'请求客户资料列表失败,再次请求第{i+1}次') + time.sleep(1) + dataReqL=requests.get(url,headers={},params={}) + resText = dataReqL.text + i = i + 1 + resL=json.loads(resText) + return resL + +def request_detail_signature_post(customerId): + print(f'开始请求客户详情:{formatted2_previous_hour(0)}') + url='https://api-bj.clink.cn/crm/query_customer' + header={'Content-Type':'application/json;charset=UTF-8'} + param={'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':86400} + print(f'param: {param}') + url_path = build_query_string(param) + url_param = url_path + print(f'url_param: {url_param}') + url_param = f'POSTapi-bj.clink.cn/crm/query_customer?{url_param}' + print(f'待计算字符串: {url_param}') + signature= generate_signature(url_param,'5g027B6w06630Y5240c1') + print(f'计算签名: {signature}') + print(f'编码后签名: {urllib.parse.quote_plus(signature)}') + url = f'{url}?{url_path}&Signature={urllib.parse.quote_plus(signature)}' + print(f'url: {url}') + + body={'customerId':customerId} + jsonData = json.dumps(body) + print(f'body: {jsonData}') + dataReqL=requests.post(url,headers=header,data=jsonData) + resText = dataReqL.text + i = 0 + while 'error' in resText and i < 5: + print(f'请求客户详情失败,再次请求第{i+1}次') + time.sleep(1) + dataReqL=requests.post(url,headers=header,params=body) + i = i + 1 + resL=json.loads(resText) + return resL + +def load_data_to_db(dataList): + conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432") + print('数据库连接成功') + dataId=str(uuid.uuid4()) + total=len(dataList) + print('临时id:'+dataId) + json_object = json.dumps(dataList) + cur=conn.cursor() + sql="update data_api.api_data set is_loaded = '1' where api_id = 'c83284b6bbb148daa6b3d04173ab748f';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')" + cur.execute(sql,[dataId,'c83284b6bbb148daa6b3d04173ab748f', json_object, total]) + conn.commit() + cur.close() + conn.close() + print('加载数据结束:customer_list:获取客户资料列表') + +def load_detail_data_to_db(ids, dataList): + conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432") + print('数据库连接成功') + dataId=str(uuid.uuid4()) + total=len(dataList) + print('临时id:'+dataId) + json_object = json.dumps(dataList) + idstr = ','.join(ids) + cur=conn.cursor() + sql="update data_api.cc_details_ids_exp set is_loaded = '1' where api_id = '010d4668242c4b96b4964693edcf5556' and id in (%s); INSERT INTO data_api.tr_custom_details (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')" + cur.execute(sql,[idstr,dataId,'010d4668242c4b96b4964693edcf5556', json_object, total]) + conn.commit() + cur.close() + conn.close() + print('加载数据结束:tickets_detail:获取客户资料详情') + +def load_detail_exp_to_db(id): + try: + print(f'添加查询客户资料异常记录:{id}') + conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432") + print('数据库连接成功') + dataId=str(uuid.uuid4()) + print('临时id:'+dataId) + cur=conn.cursor() + sql=" INSERT INTO data_api.cc_details_ids_exp (id,api_id,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')" + cur.execute(sql,[id, '010d4668242c4b96b4964693edcf5556']) + conn.commit() + cur.close() + conn.close() + print(f'添加查询客户资料异常记录:{id} 结束') + except Exception as e: + print(f'添加查询客户资料异常记录:{id}失败, 错误信息:{e}') + +if __name__ == "__main__": + print(f'{formatted2_previous_hour(0)}开始请求客户资料信息') + resL = request_list_signature_get() + print(resL) + if 'error' in resL: + error = resL['error'] + print(f'请求客户资料列表失败,失败原因:{error}') + else: + dataList = resL['customers'] + load_data_to_db(dataList) + detailDataList = [] + ids = [] + for data in dataList: + try: + for item in data: + if item['key'] == -1: + id=item['value'] + print(f'客户id:{id},开始请求数据') + resD = request_detail_signature_post(id) + print(f'请求数据结束{id},结果:{resD}') + if 'customer' in resD: + ids.append(id) + dataList = resD['customer'] + detailDataList.append(dataList) + else: + error = resD['error'] + print(f"请求客户资料详情(id:{id})失败,错误信息:{error}") + load_detail_exp_to_db(id) + except Exception as e: + print(f'请求客户资料详情(id:{id})异常, )异常信息:{e}') + load_detail_exp_to_db(data['id']) + print(f'444:{ids}') + if len(ids) > 0: + ids_str = [str(item) for item in ids] + load_detail_data_to_db(ids_str,detailDataList) + print(f'{formatted2_previous_hour(0)}请求客户资料信息结束') \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料列表/customer_list_load.sql b/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料列表/customer_list_load.sql new file mode 100644 index 0000000..ffaa088 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_custom/获取客户资料列表/customer_list_load.sql @@ -0,0 +1,117 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.tr_ticket_list; + +insert into data_api.tr_ticket_list ( + id + , workflow_id + , workflow_name + , type + , topic + , level + , status + , creator_name + , creator_id + , creator_type + , modifier_id + , modifier_type + , source + , timeout + , end_time + , create_time + , close_time + , state_selected + , last_reminder_time + , reminder_count + , customer_id + , customer_name + , customer_tel + , customer_email + , customer_address + , customer_creator_id + , customer_creator_name + , customer_modifier_id + , customer_modifier_name + , tags + , system_form + ,etl_tx_dt +) +select + case when trim(both from id)='' then null else id::text end id + , case when trim(both from workflow_id)='' then null else workflow_id::text end workflow_id + , case when trim(both from workflow_name)='' then null else workflow_name::text end workflow_name + , case when trim(both from type)='' then null else type::text end type + , case when trim(both from topic)='' then null else topic::text end topic + , case when trim(both from level)='' then null else level::text end level + , case when trim(both from status)='' then null else status::text end status + , case when trim(both from creator_name)='' then null else creator_name::text end creator_name + , case when trim(both from creator_id)='' then null else creator_id::text end creator_id + , case when trim(both from creator_type)='' then null else creator_type::text end creator_type + , case when trim(both from modifier_id)='' then null else modifier_id::text end modifier_id + , case when trim(both from modifier_type)='' then null else modifier_type::text end modifier_type + , case when trim(both from source)='' then null else source::text end source + , case when trim(both from timeout)='' then null else timeout::text end timeout + , case when trim(both from end_time)='' then null else end_time::text end end_time + , case when trim(both from create_time)='' then null else create_time::text end create_time + , case when trim(both from close_time)='' then null else close_time::text end close_time + , case when trim(both from state_selected)='' then null else state_selected::text end state_selected + , case when trim(both from last_reminder_time)='' then null else last_reminder_time::text end last_reminder_time + , case when trim(both from reminder_count)='' then null else reminder_count::text end reminder_count + , case when trim(both from customer_id)='' then null else customer_id::text end customer_id + , case when trim(both from customer_name)='' then null else customer_name::text end customer_name + , case when trim(both from customer_tel)='' then null else customer_tel::text end customer_tel + , case when trim(both from customer_email)='' then null else customer_email::text end customer_email + , case when trim(both from customer_address)='' then null else customer_address::text end customer_address + , case when trim(both from customer_creator_id)='' then null else customer_creator_id::text end customer_creator_id + , case when trim(both from customer_creator_name)='' then null else customer_creator_name::text end customer_creator_name + , case when trim(both from customer_modifier_id)='' then null else customer_modifier_id::text end customer_modifier_id + , case when trim(both from customer_modifier_name)='' then null else customer_modifier_name::text end customer_modifier_name + , case when trim(both from tags)='' then null else tags::text end tags + , case when trim(both from system_form)='' then null else system_form::text end system_form +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'id') id + , (json_array_elements(data::json)::json->>'workflowId') workflow_id + , (json_array_elements(data::json)::json->>'workflowName') workflow_name + , (json_array_elements(data::json)::json->>'type') type + , (json_array_elements(data::json)::json->>'topic') topic + , (json_array_elements(data::json)::json->>'level') level + , (json_array_elements(data::json)::json->>'status') status + , (json_array_elements(data::json)::json->>'creatorName') creator_name + , (json_array_elements(data::json)::json->>'creatorId') creator_id + , (json_array_elements(data::json)::json->>'creatorType') creator_type + , (json_array_elements(data::json)::json->>'modifierId') modifier_id + , (json_array_elements(data::json)::json->>'modifierType') modifier_type + , (json_array_elements(data::json)::json->>'source') source + , (json_array_elements(data::json)::json->>'timeout') timeout + , (json_array_elements(data::json)::json->>'endTime') end_time + , (json_array_elements(data::json)::json->>'createTime') create_time + , (json_array_elements(data::json)::json->>'closeTime') close_time + , (json_array_elements(data::json)::json->>'stateSelected') state_selected + , (json_array_elements(data::json)::json->>'lastReminderTime') last_reminder_time + , (json_array_elements(data::json)::json->>'reminderCount') reminder_count + , (json_array_elements(data::json)::json->>'customerId') customer_id + , (json_array_elements(data::json)::json->>'customerName') customer_name + , (json_array_elements(data::json)::json->>'customerTel') customer_tel + , (json_array_elements(data::json)::json->>'customerEmail') customer_email + , (json_array_elements(data::json)::json->>'customerAddress') customer_address + , (json_array_elements(data::json)::json->>'customerCreatorId') customer_creator_id + , (json_array_elements(data::json)::json->>'customerCreatorName') customer_creator_name + , (json_array_elements(data::json)::json->>'customerModifierId') customer_modifier_id + , (json_array_elements(data::json)::json->>'customerModifierName') customer_modifier_name + , (json_array_elements(data::json)::json->>'tags') tags + , (json_array_elements(data::json)::json->>'systemForm') system_form + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='c83284b6bbb148daa6b3d04173ab748f' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='c83284b6bbb148daa6b3d04173ab748f'; +\q \ No newline at end of file