add workflow 天润Smart-ccc客户数据,dev

This commit is contained in:
root 2024-07-02 11:57:21 +08:00
parent f3ca806e61
commit 3b2692fb01
13 changed files with 1308 additions and 0 deletions

View File

@ -0,0 +1,174 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
delete from p10_sa.S98_S_tr_custom_details
;
insert into p10_sa.S98_S_tr_custom_details
( id
, name
, sex
, tel
, email
, address
, level
, share_type
, share
, remark
, source
, creator_type
, creator_id
, modifier_type
, modifier_id
, last_contact_time
, last_contact_type
, customize
, create_time
, update_time
, visitor_ids
, creator_name
, modifier_name
, ib_bridged_number
, ob_bridged_number
, ob_number
, assign_time
, external_id
, ib_number
, retrieve
, retrieve_time
, queue_without_attribution
, phase_id
, phase_reason_id
, promote_source
, repeat_promote_count
, last_repeat_promote_time
, label_ids
, etl_tx_dt )
select
id
, name
, sex
, tel
, email
, address
, level
, share_type
, share
, remark
, source
, creator_type
, creator_id
, modifier_type
, modifier_id
, last_contact_time
, last_contact_type
, customize
, create_time
, update_time
, visitor_ids
, creator_name
, modifier_name
, ib_bridged_number
, ob_bridged_number
, ob_number
, assign_time
, external_id
, ib_number
, retrieve
, retrieve_time
, queue_without_attribution
, phase_id
, phase_reason_id
, promote_source
, repeat_promote_count
, last_repeat_promote_time
, label_ids
, etl_tx_dt
from p00_tal.S98_S_tr_custom_details
;
delete from p12_sfull.S98_S_tr_custom_details
;
;
insert into p12_sfull.S98_S_tr_custom_details
( id
, name
, sex
, tel
, email
, address
, level
, share_type
, share
, remark
, source
, creator_type
, creator_id
, modifier_type
, modifier_id
, last_contact_time
, last_contact_type
, customize
, create_time
, update_time
, visitor_ids
, creator_name
, modifier_name
, ib_bridged_number
, ob_bridged_number
, ob_number
, assign_time
, external_id
, ib_number
, retrieve
, retrieve_time
, queue_without_attribution
, phase_id
, phase_reason_id
, promote_source
, repeat_promote_count
, last_repeat_promote_time
, label_ids
, etl_tx_dt )
select
id
, name
, sex
, tel
, email
, address
, level
, share_type
, share
, remark
, source
, creator_type
, creator_id
, modifier_type
, modifier_id
, last_contact_time
, last_contact_type
, customize
, create_time
, update_time
, visitor_ids
, creator_name
, modifier_name
, ib_bridged_number
, ob_bridged_number
, ob_number
, assign_time
, external_id
, ib_number
, retrieve
, retrieve_time
, queue_without_attribution
, phase_id
, phase_reason_id
, promote_source
, repeat_promote_count
, last_repeat_promote_time
, label_ids
, etl_tx_dt
from p10_sa.S98_S_tr_custom_details
;
\q

View File

@ -0,0 +1,50 @@
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_tr_custom_details (
id TEXT
, name TEXT
, sex TEXT
, tel TEXT
, email TEXT
, address TEXT
, level TEXT
, share_type TEXT
, share TEXT
, remark TEXT
, source TEXT
, creator_type TEXT
, creator_id TEXT
, modifier_type TEXT
, modifier_id TEXT
, last_contact_time TEXT
, last_contact_type TEXT
, customize TEXT
, create_time TEXT
, update_time TEXT
, visitor_ids TEXT
, creator_name TEXT
, modifier_name TEXT
, ib_bridged_number TEXT
, ob_bridged_number TEXT
, ob_number TEXT
, assign_time TEXT
, external_id TEXT
, ib_number TEXT
, retrieve TEXT
, retrieve_time TEXT
, queue_without_attribution TEXT
, phase_id TEXT
, phase_reason_id TEXT
, promote_source TEXT
, repeat_promote_count TEXT
, last_repeat_promote_time TEXT
, label_ids TEXT
, etl_tx_dt TIMESTAMP
)
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'tr_custom_details' );

View File

@ -0,0 +1,171 @@
create table if not exists p10_sa.S98_S_tr_custom_details (
id TEXT
, name TEXT
, sex TEXT
, tel TEXT
, email TEXT
, address TEXT
, level TEXT
, share_type TEXT
, share TEXT
, remark TEXT
, source TEXT
, creator_type TEXT
, creator_id TEXT
, modifier_type TEXT
, modifier_id TEXT
, last_contact_time TEXT
, last_contact_type TEXT
, customize TEXT
, create_time TEXT
, update_time TEXT
, visitor_ids TEXT
, creator_name TEXT
, modifier_name TEXT
, ib_bridged_number TEXT
, ob_bridged_number TEXT
, ob_number TEXT
, assign_time TEXT
, external_id TEXT
, ib_number TEXT
, retrieve TEXT
, retrieve_time TEXT
, queue_without_attribution TEXT
, phase_id TEXT
, phase_reason_id TEXT
, promote_source TEXT
, repeat_promote_count TEXT
, last_repeat_promote_time TEXT
, label_ids TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.id IS '客户资料id';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.name IS '客户名称';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.sex IS '客户性别';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.tel IS '客户号码';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.email IS '邮箱';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.address IS '地址';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.level IS '客户等级';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.share_type IS '归属类型0全体共享、1员工组共享、2员工私有、3无归属';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.share IS '客户归属';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.remark IS '备注';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.source IS '客户来源';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.creator_type IS '创建人类型';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.creator_id IS '创建人id-1openApi';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.modifier_type IS '更新人类型';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.modifier_id IS '更新人id-1openApi';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.last_contact_time IS '最后一次联系时间';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.last_contact_type IS '最后一次联系类型';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.customize IS '该对象中id为自定义字段id name为自定义字段名称 value为自定义字段值';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.create_time IS '创建时间';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.update_time IS '更新时间';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.visitor_ids IS '访客ids';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.creator_name IS '创建人名称';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.modifier_name IS '更新人名称';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.ib_bridged_number IS '呼入接通次数';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.ob_bridged_number IS '呼出接通次数';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.ob_number IS '呼出次数';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.assign_time IS '分配时间';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.external_id IS '外部企业客户ID (第三方平台 ID';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.ib_number IS '呼入次数';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.retrieve IS '是否为回收客户';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.retrieve_time IS '回收时间';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.queue_without_attribution IS '无归属授权员工组';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.phase_id IS '客户阶段id';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.phase_reason_id IS '阶段原因id';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.promote_source IS '推广来源';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.repeat_promote_count IS '重复推广次数';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.last_repeat_promote_time IS '最近一次重复推广时间';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.label_ids IS '客户标签id';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_details.etl_tx_dt IS '';
COMMENT ON TABLE p10_sa.S98_S_tr_custom_details IS '客户资料详情';
create table if not exists p12_sfull.S98_S_tr_custom_details (
id TEXT
, name TEXT
, sex TEXT
, tel TEXT
, email TEXT
, address TEXT
, level TEXT
, share_type TEXT
, share TEXT
, remark TEXT
, source TEXT
, creator_type TEXT
, creator_id TEXT
, modifier_type TEXT
, modifier_id TEXT
, last_contact_time TEXT
, last_contact_type TEXT
, customize TEXT
, create_time TEXT
, update_time TEXT
, visitor_ids TEXT
, creator_name TEXT
, modifier_name TEXT
, ib_bridged_number TEXT
, ob_bridged_number TEXT
, ob_number TEXT
, assign_time TEXT
, external_id TEXT
, ib_number TEXT
, retrieve TEXT
, retrieve_time TEXT
, queue_without_attribution TEXT
, phase_id TEXT
, phase_reason_id TEXT
, promote_source TEXT
, repeat_promote_count TEXT
, last_repeat_promote_time TEXT
, label_ids TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.id IS '客户资料id';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.name IS '客户名称';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.sex IS '客户性别';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.tel IS '客户号码';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.email IS '邮箱';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.address IS '地址';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.level IS '客户等级';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.share_type IS '归属类型0全体共享、1员工组共享、2员工私有、3无归属';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.share IS '客户归属';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.remark IS '备注';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.source IS '客户来源';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.creator_type IS '创建人类型';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.creator_id IS '创建人id-1openApi';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.modifier_type IS '更新人类型';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.modifier_id IS '更新人id-1openApi';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.last_contact_time IS '最后一次联系时间';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.last_contact_type IS '最后一次联系类型';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.customize IS '该对象中id为自定义字段id name为自定义字段名称 value为自定义字段值';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.create_time IS '创建时间';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.update_time IS '更新时间';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.visitor_ids IS '访客ids';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.creator_name IS '创建人名称';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.modifier_name IS '更新人名称';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.ib_bridged_number IS '呼入接通次数';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.ob_bridged_number IS '呼出接通次数';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.ob_number IS '呼出次数';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.assign_time IS '分配时间';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.external_id IS '外部企业客户ID (第三方平台 ID';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.ib_number IS '呼入次数';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.retrieve IS '是否为回收客户';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.retrieve_time IS '回收时间';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.queue_without_attribution IS '无归属授权员工组';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.phase_id IS '客户阶段id';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.phase_reason_id IS '阶段原因id';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.promote_source IS '推广来源';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.repeat_promote_count IS '重复推广次数';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.last_repeat_promote_time IS '最近一次重复推广时间';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.label_ids IS '客户标签id';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_details.etl_tx_dt IS '';
COMMENT ON TABLE p12_sfull.S98_S_tr_custom_details IS '客户资料详情';

View File

@ -0,0 +1,30 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
delete from p10_sa.S98_S_tr_custom_labels
;
insert into p10_sa.S98_S_tr_custom_labels
( label_group_name
, customer_label_list
, etl_tx_dt )
select
label_group_name
, customer_label_list
, etl_tx_dt
from p00_tal.S98_S_tr_custom_labels
;
delete from p12_sfull.S98_S_tr_custom_labels
;
;
insert into p12_sfull.S98_S_tr_custom_labels
( label_group_name
, customer_label_list
, etl_tx_dt )
select
label_group_name
, customer_label_list
, etl_tx_dt
from p10_sa.S98_S_tr_custom_labels
;
\q

View File

@ -0,0 +1,14 @@
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_tr_custom_labels (
label_group_name TEXT
, customer_label_list TEXT
, etl_tx_dt TIMESTAMP
)
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'tr_custom_labels' );

View File

@ -0,0 +1,27 @@
create table if not exists p10_sa.S98_S_tr_custom_labels (
label_group_name TEXT
, customer_label_list TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_labels.label_group_name IS '标签分组名称';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_labels.customer_label_list IS '分组下的标签对象数组';
COMMENT ON COLUMN p10_sa.S98_S_tr_custom_labels.etl_tx_dt IS '';
COMMENT ON TABLE p10_sa.S98_S_tr_custom_labels IS '客户标签列表';
create table if not exists p12_sfull.S98_S_tr_custom_labels (
label_group_name TEXT
, customer_label_list TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_labels.label_group_name IS '标签分组名称';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_labels.customer_label_list IS '分组下的标签对象数组';
COMMENT ON COLUMN p12_sfull.S98_S_tr_custom_labels.etl_tx_dt IS '';
COMMENT ON TABLE p12_sfull.S98_S_tr_custom_labels IS '客户标签列表';

View File

@ -0,0 +1,90 @@
#!/usr/bin/python
# -*- encoding=utf-8 -*-
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import json
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
sshHook = SSHHook(ssh_conn_id ='ssh_air')
default_args = {
'owner': 'info@idgvalue.com',
'email_on_failure': True,
'email_on_retry':True,
'start_date': datetime(2024, 1, 1),
'depends_on_past': False,
'retries': 6,
'retry_delay': timedelta(minutes=10),
}
dag = DAG('wf_dag_smart_ccc_custom', default_args=default_args,
schedule_interval="0 0-23/1 * * *",
catchup=False,
dagrun_timeout=timedelta(minutes=160),
max_active_runs=3)
task_failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="task_failed",
to=["info@idgvalue.com"],
cc=[""],
subject="smart_ccc_custom_failed",
html_content='<h3>您好smart_ccc_custom作业失败请及时处理" </h3>')
customer_list_feign = SSHOperator(
ssh_hook=sshHook,
task_id='customer_list_feign',
command='python3 /data/airflow/etl/API/customer_list_feign.py',
depends_on_past=False,
retries=3,
dag=dag)
customer_labels_feign = SSHOperator(
ssh_hook=sshHook,
task_id='customer_labels_feign',
command='python3 /data/airflow/etl/API/customer_labels_feign.py',
depends_on_past=False,
retries=3,
dag=dag)
customer_labels_load = SSHOperator(
ssh_hook=sshHook,
task_id='customer_labels_load',
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
params={'my_param':"customer_labels_load"},
depends_on_past=False,
retries=3,
dag=dag)
customer_labels_feign >> customer_labels_load
tr_custom_details_5516 = SSHOperator(
ssh_hook=sshHook,
task_id='tr_custom_details_5516',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"S98_S_tr_custom_details"},
depends_on_past=False,
retries=3,
dag=dag)
tr_custom_labels_8280 = SSHOperator(
ssh_hook=sshHook,
task_id='tr_custom_labels_8280',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"S98_S_tr_custom_labels"},
depends_on_past=False,
retries=3,
dag=dag)
customer_labels_load >> tr_custom_labels_8280
customer_list_feign >> tr_custom_details_5516
tr_custom_details_5516 >> task_failed
tr_custom_labels_8280 >> task_failed

View File

@ -0,0 +1,123 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
import time
import hashlib
import time
import hmac
import base64
import urllib.parse
import hashlib
from collections import OrderedDict
from urllib.parse import quote_plus
#全局变量,便于参数使用的预设值
current_date = datetime.date.today() # 获取当前日期
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
timestamp = time.time() # 为Unix time即从"1970-01-01 00:00:00"至今的秒数;
sign_version = 'v2' # 签名版本号固定值v2
nonce = str(uuid.uuid4())
current_time_utc =( datetime.datetime.now() - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%SZ")
formatted2_current_date = current_date.strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期 - 标准化
formatted2_previous_date = previous_date.strftime("%Y-%m-%d %H:%M:%S") # 获取前一天日期 - 标准化
def formatted2_previous_hour(h):
if h==0:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
def previous_hour_timestamp(h):
if h==0:
return int(time.time())
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return int(start_of_previous_hour.timestamp())
#计算签名
def generate_signature(str, private_key):
signature = hmac.new(private_key.encode(), (str).encode(), hashlib.sha1)
signature_b64 = base64.b64encode(signature.digest()).decode()
return signature_b64
#构建查询链接
def build_query_string(params):
# 使用OrderedDict来保持排序
sorted_params = OrderedDict(sorted(params.items()))
# 拼接属性名和属性值,并使用&连接
query_string = '&'.join('{}={}'.format(
urllib.parse.quote_plus(k),
urllib.parse.quote_plus(str(v))
) for k, v in sorted_params.items())
return query_string
def request_data_signature_post():
print('开始请求数据...')
url='https://api-bj.clink.cn/crm/list_customer_labels'
header={}
param={'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':Expires,}
print(f'param: {param}')
paramJson = {"Timestamp":""}
print(f'paramJson: {paramJson}')
url_path = build_query_string(param)
url_param = build_query_string(param)
print(f'url_param: {url_param}')
url_param = f'POSTapi-bj.clink.cn/crm/list_customer_labels?{url_param}'
print(f'url_param2: {url_param}')
signature= generate_signature(url_param,'5g027B6w06630Y5240c1')
print(f'signature: {signature}')
url = f'{url}?{url_path}&Signature={signature}'
print(f'url: {url}')
body={}
print(f'body: {body}')
dataReqL=requests.post(url,headers=header,params=body)
i = 0
while 'error' in dataReqL and i < 5:
time.sleep(1)
dataReqL=requests.post(url,headers=header,params=body)
i = i + 1
resL=json.loads(dataReqL.text)
print(dataReqL)
resL=json.loads(dataReqL.text)
return resL
def load_data_to_db(dataList):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
total=len(dataList)
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = '02d5a32d8736457fa51d668652cc50af';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'02d5a32d8736457fa51d668652cc50af', json_object, total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束customer_labels:查询客户资料可用标签')
if __name__ == "__main__":
resL = request_data_signature_post()
print(resL)
if 'error' in resL:
load_error_to_db(resl)
load_data_to_db(resL[''customerLabels'])

View File

@ -0,0 +1,30 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.tr_custom_labels;
insert into data_api.tr_custom_labels (
label_group_name
, customer_label_list
,etl_tx_dt
)
select
case when trim(both from label_group_name)='' then null else label_group_name::text end label_group_name
, case when trim(both from customer_label_list)='' then null else customer_label_list::text end customer_label_list
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'labelGroupName') label_group_name
, (json_array_elements(data::json)::json->>'customerLabelList') customer_label_list
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='02d5a32d8736457fa51d668652cc50af' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='02d5a32d8736457fa51d668652cc50af';
\q

View File

@ -0,0 +1,123 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
import time
import hashlib
import time
import hmac
import base64
import urllib.parse
import hashlib
from collections import OrderedDict
from urllib.parse import quote_plus
#全局变量,便于参数使用的预设值
current_date = datetime.date.today() # 获取当前日期
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
timestamp = time.time() # 为Unix time即从"1970-01-01 00:00:00"至今的秒数;
sign_version = 'v2' # 签名版本号固定值v2
nonce = str(uuid.uuid4())
current_time_utc =( datetime.datetime.now() - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%SZ")
formatted2_current_date = current_date.strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期 - 标准化
formatted2_previous_date = previous_date.strftime("%Y-%m-%d %H:%M:%S") # 获取前一天日期 - 标准化
def formatted2_previous_hour(h):
if h==0:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
def previous_hour_timestamp(h):
if h==0:
return int(time.time())
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return int(start_of_previous_hour.timestamp())
#计算签名
def generate_signature(str, private_key):
signature = hmac.new(private_key.encode(), (str).encode(), hashlib.sha1)
signature_b64 = base64.b64encode(signature.digest()).decode()
return signature_b64
#构建查询链接
def build_query_string(params):
# 使用OrderedDict来保持排序
sorted_params = OrderedDict(sorted(params.items()))
# 拼接属性名和属性值,并使用&连接
query_string = '&'.join('{}={}'.format(
urllib.parse.quote_plus(k),
urllib.parse.quote_plus(str(v))
) for k, v in sorted_params.items())
return query_string
def request_data_signature_post():
print('开始请求数据...')
url='https://api-bj.clink.cn/crm/query_customer'
header={}
param={'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':'86400',}
print(f'param: {param}')
paramJson = {"Expires":"86400","Timestamp":""}
print(f'paramJson: {paramJson}')
url_path = build_query_string(param)
url_param = build_query_string(param)
print(f'url_param: {url_param}')
url_param = f'POSTapi-bj.clink.cn/crm/query_customer?{url_param}'
print(f'url_param2: {url_param}')
signature= generate_signature(url_param,'5g027B6w06630Y5240c1')
print(f'signature: {signature}')
url = f'{url}?{url_path}&Signature={signature}'
print(f'url: {url}')
body={'customerId':'33',}
print(f'body: {body}')
dataReqL=requests.post(url,headers=header,params=body)
i = 0
while 'error' in dataReqL and i < 5:
time.sleep(1)
dataReqL=requests.post(url,headers=header,params=body)
i = i + 1
resL=json.loads(dataReqL.text)
print(dataReqL)
resL=json.loads(dataReqL.text)
return resL
def load_data_to_db(dataList):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
total=len(dataList)
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = '010d4668242c4b96b4964693edcf5556';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'010d4668242c4b96b4964693edcf5556', json_object, total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束customer_detail:获取客户资料信息')
if __name__ == "__main__":
resL = request_data_signature_post()
print(resL)
if 'error' in resL:
load_error_to_db(resl)
load_data_to_db(resL[''customer'])

View File

@ -0,0 +1,138 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.tr_custom_details;
insert into data_api.tr_custom_details (
id
, name
, sex
, tel
, email
, address
, level
, share_type
, share
, remark
, source
, creator_type
, creator_id
, modifier_type
, modifier_id
, last_contact_time
, last_contact_type
, customize
, create_time
, update_time
, visitor_ids
, creator_name
, modifier_name
, ib_bridged_number
, ob_bridged_number
, ob_number
, assign_time
, external_id
, ib_number
, retrieve
, retrieve_time
, queue_without_attribution
, phase_id
, phase_reason_id
, promote_source
, repeat_promote_count
, last_repeat_promote_time
, label_ids
,etl_tx_dt
)
select
case when trim(both from id)='' then null else id::text end id
, case when trim(both from name)='' then null else name::text end name
, case when trim(both from sex)='' then null else sex::text end sex
, case when trim(both from tel)='' then null else tel::text end tel
, case when trim(both from email)='' then null else email::text end email
, case when trim(both from address)='' then null else address::text end address
, case when trim(both from level)='' then null else level::text end level
, case when trim(both from share_type)='' then null else share_type::text end share_type
, case when trim(both from share)='' then null else share::text end share
, case when trim(both from remark)='' then null else remark::text end remark
, case when trim(both from source)='' then null else source::text end source
, case when trim(both from creator_type)='' then null else creator_type::text end creator_type
, case when trim(both from creator_id)='' then null else creator_id::text end creator_id
, case when trim(both from modifier_type)='' then null else modifier_type::text end modifier_type
, case when trim(both from modifier_id)='' then null else modifier_id::text end modifier_id
, case when trim(both from last_contact_time)='' then null else last_contact_time::text end last_contact_time
, case when trim(both from last_contact_type)='' then null else last_contact_type::text end last_contact_type
, case when trim(both from customize)='' then null else customize::text end customize
, case when trim(both from create_time)='' then null else create_time::text end create_time
, case when trim(both from update_time)='' then null else update_time::text end update_time
, case when trim(both from visitor_ids)='' then null else visitor_ids::text end visitor_ids
, case when trim(both from creator_name)='' then null else creator_name::text end creator_name
, case when trim(both from modifier_name)='' then null else modifier_name::text end modifier_name
, case when trim(both from ib_bridged_number)='' then null else ib_bridged_number::text end ib_bridged_number
, case when trim(both from ob_bridged_number)='' then null else ob_bridged_number::text end ob_bridged_number
, case when trim(both from ob_number)='' then null else ob_number::text end ob_number
, case when trim(both from assign_time)='' then null else assign_time::text end assign_time
, case when trim(both from external_id)='' then null else external_id::text end external_id
, case when trim(both from ib_number)='' then null else ib_number::text end ib_number
, case when trim(both from retrieve)='' then null else retrieve::text end retrieve
, case when trim(both from retrieve_time)='' then null else retrieve_time::text end retrieve_time
, case when trim(both from queue_without_attribution)='' then null else queue_without_attribution::text end queue_without_attribution
, case when trim(both from phase_id)='' then null else phase_id::text end phase_id
, case when trim(both from phase_reason_id)='' then null else phase_reason_id::text end phase_reason_id
, case when trim(both from promote_source)='' then null else promote_source::text end promote_source
, case when trim(both from repeat_promote_count)='' then null else repeat_promote_count::text end repeat_promote_count
, case when trim(both from last_repeat_promote_time)='' then null else last_repeat_promote_time::text end last_repeat_promote_time
, case when trim(both from label_ids)='' then null else label_ids::text end label_ids
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'name') name
, (json_array_elements(data::json)::json->>'sex') sex
, (json_array_elements(data::json)::json->>'tel') tel
, (json_array_elements(data::json)::json->>'email') email
, (json_array_elements(data::json)::json->>'address') address
, (json_array_elements(data::json)::json->>'level') level
, (json_array_elements(data::json)::json->>'shareType') share_type
, (json_array_elements(data::json)::json->>'share') share
, (json_array_elements(data::json)::json->>'remark') remark
, (json_array_elements(data::json)::json->>'source') source
, (json_array_elements(data::json)::json->>'creatorType') creator_type
, (json_array_elements(data::json)::json->>'creatorId') creator_id
, (json_array_elements(data::json)::json->>'modifierType') modifier_type
, (json_array_elements(data::json)::json->>'modifierId') modifier_id
, (json_array_elements(data::json)::json->>'lastContactTime') last_contact_time
, (json_array_elements(data::json)::json->>'lastContactType') last_contact_type
, (json_array_elements(data::json)::json->>'customize') customize
, (json_array_elements(data::json)::json->>'createTime') create_time
, (json_array_elements(data::json)::json->>'updateTime') update_time
, (json_array_elements(data::json)::json->>'visitorIds') visitor_ids
, (json_array_elements(data::json)::json->>'creatorName') creator_name
, (json_array_elements(data::json)::json->>'modifierName') modifier_name
, (json_array_elements(data::json)::json->>'ibBridgedNumber') ib_bridged_number
, (json_array_elements(data::json)::json->>'obBridgedNumber') ob_bridged_number
, (json_array_elements(data::json)::json->>'obNumber') ob_number
, (json_array_elements(data::json)::json->>'assignTime') assign_time
, (json_array_elements(data::json)::json->>'externalId') external_id
, (json_array_elements(data::json)::json->>'ibNumber') ib_number
, (json_array_elements(data::json)::json->>'retrieve') retrieve
, (json_array_elements(data::json)::json->>'retrieveTime') retrieve_time
, (json_array_elements(data::json)::json->>'queueWithoutAttribution') queue_without_attribution
, (json_array_elements(data::json)::json->>'phaseId') phase_id
, (json_array_elements(data::json)::json->>'phaseReasonId') phase_reason_id
, (json_array_elements(data::json)::json->>'promoteSource') promote_source
, (json_array_elements(data::json)::json->>'repeatPromoteCount') repeat_promote_count
, (json_array_elements(data::json)::json->>'lastRepeatPromoteTime') last_repeat_promote_time
, (json_array_elements(data::json)::json->>'labelIds') label_ids
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='010d4668242c4b96b4964693edcf5556' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='010d4668242c4b96b4964693edcf5556';
\q

View File

@ -0,0 +1,221 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
import time
import hashlib
import time
import hmac
import base64
import urllib.parse
import hashlib
from collections import OrderedDict
from urllib.parse import quote_plus
#全局变量,便于参数使用的预设值
current_date = datetime.date.today() # 获取当前日期
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
timestamp = time.time() # 为Unix time即从"1970-01-01 00:00:00"至今的秒数;
sign_version = 'v2' # 签名版本号固定值v2
nonce = str(uuid.uuid4())
current_time_utc =( datetime.datetime.now() - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%SZ")
formatted2_current_date = current_date.strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期 - 标准化
formatted2_previous_date = previous_date.strftime("%Y-%m-%d %H:%M:%S") # 获取前一天日期 - 标准化
def formatted2_previous_hour(h):
if h==0:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
def previous_hour_timestamp(h):
if h==0:
return int(time.time())
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return int(start_of_previous_hour.timestamp())
#计算签名
def generate_signature(str, private_key):
signature = hmac.new(private_key.encode(), (str).encode(), hashlib.sha1)
signature_b64 = base64.b64encode(signature.digest()).decode()
return signature_b64
#构建查询链接
def build_query_string(params):
# 使用OrderedDict来保持排序
sorted_params = OrderedDict(sorted(params.items()))
# 拼接属性名和属性值,并使用&连接
query_string = '&'.join('{}={}'.format(
urllib.parse.quote_plus(k),
urllib.parse.quote_plus(str(v))
) for k, v in sorted_params.items())
return query_string
#构建查询链接
def build_query_string(params):
# 使用OrderedDict来保持排序
sorted_params = OrderedDict(sorted(params.items()))
# 拼接属性名和属性值,并使用&连接
query_string = '&'.join('{}={}'.format(
urllib.parse.quote_plus(k),
urllib.parse.quote_plus(str(v))
) for k, v in sorted_params.items())
return query_string
#计算签名get请求
def request_list_signature_get():
print('开始请求数据...')
url='https://api-bj.clink.cn/crm/list_customers'
# param={'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':86400,'updateStartTime':previous_hour_timestamp(1),'updateEndTime':previous_hour_timestamp(0)}
param={'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':86400,'updateStartTime':1704038400,'updateEndTime':previous_hour_timestamp(0)}
print(f'param: {param}')
url_path = build_query_string(param)
url_param = url_path
print(f'url_param: {url_param}')
url_param = f'GETapi-bj.clink.cn/crm/list_customers?{url_param}'
print(f'待计算字符串: {url_param}')
signature= generate_signature(url_param,'5g027B6w06630Y5240c1')
print(f'计算签名: {signature}')
print(f'编码后签名: {urllib.parse.quote_plus(signature)}')
url = f'{url}?{url_path}&Signature={urllib.parse.quote_plus(signature)}'
print(f'url: {url}')
dataReqL=requests.get(url,headers={},params={})
resText = dataReqL.text
i = 0
while 'error' in resText and i < 5:
print(f'请求客户资料列表失败,再次请求第{i+1}')
time.sleep(1)
dataReqL=requests.get(url,headers={},params={})
resText = dataReqL.text
i = i + 1
resL=json.loads(resText)
return resL
def request_detail_signature_post(customerId):
print(f'开始请求客户详情:{formatted2_previous_hour(0)}')
url='https://api-bj.clink.cn/crm/query_customer'
header={'Content-Type':'application/json;charset=UTF-8'}
param={'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':86400}
print(f'param: {param}')
url_path = build_query_string(param)
url_param = url_path
print(f'url_param: {url_param}')
url_param = f'POSTapi-bj.clink.cn/crm/query_customer?{url_param}'
print(f'待计算字符串: {url_param}')
signature= generate_signature(url_param,'5g027B6w06630Y5240c1')
print(f'计算签名: {signature}')
print(f'编码后签名: {urllib.parse.quote_plus(signature)}')
url = f'{url}?{url_path}&Signature={urllib.parse.quote_plus(signature)}'
print(f'url: {url}')
body={'customerId':customerId}
jsonData = json.dumps(body)
print(f'body: {jsonData}')
dataReqL=requests.post(url,headers=header,data=jsonData)
resText = dataReqL.text
i = 0
while 'error' in resText and i < 5:
print(f'请求客户详情失败,再次请求第{i+1}')
time.sleep(1)
dataReqL=requests.post(url,headers=header,params=body)
i = i + 1
resL=json.loads(resText)
return resL
def load_data_to_db(dataList):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
total=len(dataList)
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = 'c83284b6bbb148daa6b3d04173ab748f';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'c83284b6bbb148daa6b3d04173ab748f', json_object, total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束customer_list:获取客户资料列表')
def load_detail_data_to_db(ids, dataList):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
total=len(dataList)
print('临时id'+dataId)
json_object = json.dumps(dataList)
idstr = ','.join(ids)
cur=conn.cursor()
sql="update data_api.cc_details_ids_exp set is_loaded = '1' where api_id = '010d4668242c4b96b4964693edcf5556' and id in (%s); INSERT INTO data_api.tr_custom_details (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[idstr,dataId,'010d4668242c4b96b4964693edcf5556', json_object, total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束tickets_detail:获取客户资料详情')
def load_detail_exp_to_db(id):
try:
print(f'添加查询客户资料异常记录:{id}')
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
cur=conn.cursor()
sql=" INSERT INTO data_api.cc_details_ids_exp (id,api_id,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[id, '010d4668242c4b96b4964693edcf5556'])
conn.commit()
cur.close()
conn.close()
print(f'添加查询客户资料异常记录:{id} 结束')
except Exception as e:
print(f'添加查询客户资料异常记录:{id}失败, 错误信息:{e}')
if __name__ == "__main__":
print(f'{formatted2_previous_hour(0)}开始请求客户资料信息')
resL = request_list_signature_get()
print(resL)
if 'error' in resL:
error = resL['error']
print(f'请求客户资料列表失败,失败原因:{error}')
else:
dataList = resL['customers']
load_data_to_db(dataList)
detailDataList = []
ids = []
for data in dataList:
try:
for item in data:
if item['key'] == -1:
id=item['value']
print(f'客户id:{id},开始请求数据')
resD = request_detail_signature_post(id)
print(f'请求数据结束{id},结果:{resD}')
if 'customer' in resD:
ids.append(id)
dataList = resD['customer']
detailDataList.append(dataList)
else:
error = resD['error']
print(f"请求客户资料详情id:{id})失败,错误信息:{error}")
load_detail_exp_to_db(id)
except Exception as e:
print(f'请求客户资料详情id:{id})异常, )异常信息:{e}')
load_detail_exp_to_db(data['id'])
print(f'444:{ids}')
if len(ids) > 0:
ids_str = [str(item) for item in ids]
load_detail_data_to_db(ids_str,detailDataList)
print(f'{formatted2_previous_hour(0)}请求客户资料信息结束')

View File

@ -0,0 +1,117 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.tr_ticket_list;
insert into data_api.tr_ticket_list (
id
, workflow_id
, workflow_name
, type
, topic
, level
, status
, creator_name
, creator_id
, creator_type
, modifier_id
, modifier_type
, source
, timeout
, end_time
, create_time
, close_time
, state_selected
, last_reminder_time
, reminder_count
, customer_id
, customer_name
, customer_tel
, customer_email
, customer_address
, customer_creator_id
, customer_creator_name
, customer_modifier_id
, customer_modifier_name
, tags
, system_form
,etl_tx_dt
)
select
case when trim(both from id)='' then null else id::text end id
, case when trim(both from workflow_id)='' then null else workflow_id::text end workflow_id
, case when trim(both from workflow_name)='' then null else workflow_name::text end workflow_name
, case when trim(both from type)='' then null else type::text end type
, case when trim(both from topic)='' then null else topic::text end topic
, case when trim(both from level)='' then null else level::text end level
, case when trim(both from status)='' then null else status::text end status
, case when trim(both from creator_name)='' then null else creator_name::text end creator_name
, case when trim(both from creator_id)='' then null else creator_id::text end creator_id
, case when trim(both from creator_type)='' then null else creator_type::text end creator_type
, case when trim(both from modifier_id)='' then null else modifier_id::text end modifier_id
, case when trim(both from modifier_type)='' then null else modifier_type::text end modifier_type
, case when trim(both from source)='' then null else source::text end source
, case when trim(both from timeout)='' then null else timeout::text end timeout
, case when trim(both from end_time)='' then null else end_time::text end end_time
, case when trim(both from create_time)='' then null else create_time::text end create_time
, case when trim(both from close_time)='' then null else close_time::text end close_time
, case when trim(both from state_selected)='' then null else state_selected::text end state_selected
, case when trim(both from last_reminder_time)='' then null else last_reminder_time::text end last_reminder_time
, case when trim(both from reminder_count)='' then null else reminder_count::text end reminder_count
, case when trim(both from customer_id)='' then null else customer_id::text end customer_id
, case when trim(both from customer_name)='' then null else customer_name::text end customer_name
, case when trim(both from customer_tel)='' then null else customer_tel::text end customer_tel
, case when trim(both from customer_email)='' then null else customer_email::text end customer_email
, case when trim(both from customer_address)='' then null else customer_address::text end customer_address
, case when trim(both from customer_creator_id)='' then null else customer_creator_id::text end customer_creator_id
, case when trim(both from customer_creator_name)='' then null else customer_creator_name::text end customer_creator_name
, case when trim(both from customer_modifier_id)='' then null else customer_modifier_id::text end customer_modifier_id
, case when trim(both from customer_modifier_name)='' then null else customer_modifier_name::text end customer_modifier_name
, case when trim(both from tags)='' then null else tags::text end tags
, case when trim(both from system_form)='' then null else system_form::text end system_form
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'id') id
, (json_array_elements(data::json)::json->>'workflowId') workflow_id
, (json_array_elements(data::json)::json->>'workflowName') workflow_name
, (json_array_elements(data::json)::json->>'type') type
, (json_array_elements(data::json)::json->>'topic') topic
, (json_array_elements(data::json)::json->>'level') level
, (json_array_elements(data::json)::json->>'status') status
, (json_array_elements(data::json)::json->>'creatorName') creator_name
, (json_array_elements(data::json)::json->>'creatorId') creator_id
, (json_array_elements(data::json)::json->>'creatorType') creator_type
, (json_array_elements(data::json)::json->>'modifierId') modifier_id
, (json_array_elements(data::json)::json->>'modifierType') modifier_type
, (json_array_elements(data::json)::json->>'source') source
, (json_array_elements(data::json)::json->>'timeout') timeout
, (json_array_elements(data::json)::json->>'endTime') end_time
, (json_array_elements(data::json)::json->>'createTime') create_time
, (json_array_elements(data::json)::json->>'closeTime') close_time
, (json_array_elements(data::json)::json->>'stateSelected') state_selected
, (json_array_elements(data::json)::json->>'lastReminderTime') last_reminder_time
, (json_array_elements(data::json)::json->>'reminderCount') reminder_count
, (json_array_elements(data::json)::json->>'customerId') customer_id
, (json_array_elements(data::json)::json->>'customerName') customer_name
, (json_array_elements(data::json)::json->>'customerTel') customer_tel
, (json_array_elements(data::json)::json->>'customerEmail') customer_email
, (json_array_elements(data::json)::json->>'customerAddress') customer_address
, (json_array_elements(data::json)::json->>'customerCreatorId') customer_creator_id
, (json_array_elements(data::json)::json->>'customerCreatorName') customer_creator_name
, (json_array_elements(data::json)::json->>'customerModifierId') customer_modifier_id
, (json_array_elements(data::json)::json->>'customerModifierName') customer_modifier_name
, (json_array_elements(data::json)::json->>'tags') tags
, (json_array_elements(data::json)::json->>'systemForm') system_form
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='c83284b6bbb148daa6b3d04173ab748f' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='c83284b6bbb148daa6b3d04173ab748f';
\q