add workflow 天润Smart-ccc会话数据,dev

This commit is contained in:
root 2024-07-03 13:31:02 +08:00
parent e6495ffd5b
commit 27df073e70
11 changed files with 1292 additions and 0 deletions

View File

@ -0,0 +1,74 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
delete from p10_sa.S98_S_tr_chat_messages
;
insert into p10_sa.S98_S_tr_chat_messages
( unique_id
, main_unique_id
, sender_id
, sender_name
, sender_type
, message_type
, content
, file_key
, file_name
, file_url
, send_status
, sensitive_word
, create_time
, etl_tx_dt )
select
unique_id
, main_unique_id
, sender_id
, sender_name
, sender_type
, message_type
, content
, file_key
, file_name
, file_url
, send_status
, sensitive_word
, create_time
, etl_tx_dt
from p00_tal.S98_S_tr_chat_messages
;
delete from p12_sfull.S98_S_tr_chat_messages
;
;
insert into p12_sfull.S98_S_tr_chat_messages
( unique_id
, main_unique_id
, sender_id
, sender_name
, sender_type
, message_type
, content
, file_key
, file_name
, file_url
, send_status
, sensitive_word
, create_time
, etl_tx_dt )
select
unique_id
, main_unique_id
, sender_id
, sender_name
, sender_type
, message_type
, content
, file_key
, file_name
, file_url
, send_status
, sensitive_word
, create_time
, etl_tx_dt
from p10_sa.S98_S_tr_chat_messages
;
\q

View File

@ -0,0 +1,25 @@
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_tr_chat_messages (
unique_id TEXT
, main_unique_id TEXT
, sender_id TEXT
, sender_name TEXT
, sender_type TEXT
, message_type TEXT
, content TEXT
, file_key TEXT
, file_name TEXT
, file_url TEXT
, send_status TEXT
, sensitive_word TEXT
, create_time TEXT
, etl_tx_dt TIMESTAMP
)
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'tr_chat_messages' );

View File

@ -0,0 +1,71 @@
create table if not exists p10_sa.S98_S_tr_chat_messages (
unique_id TEXT
, main_unique_id TEXT
, sender_id TEXT
, sender_name TEXT
, sender_type TEXT
, message_type TEXT
, content TEXT
, file_key TEXT
, file_name TEXT
, file_url TEXT
, send_status TEXT
, sensitive_word TEXT
, create_time TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.unique_id IS ' 消息唯一标识';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.main_unique_id IS '在线客服会话ID';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.sender_id IS '消息发送方ID';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.sender_name IS '消息发送方名称';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.sender_type IS '消息发送方类型';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.message_type IS '消息类型';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.content IS '消息内容';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.file_key IS '文件的地址';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.file_name IS '文件名称';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.file_url IS '文件访问路径';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.send_status IS '消息发送状态';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.sensitive_word IS '消息内容包含敏感词';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.create_time IS '消息创建时间戳';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_messages.etl_tx_dt IS '';
COMMENT ON TABLE p10_sa.S98_S_tr_chat_messages IS '会话消息详情';
create table if not exists p12_sfull.S98_S_tr_chat_messages (
unique_id TEXT
, main_unique_id TEXT
, sender_id TEXT
, sender_name TEXT
, sender_type TEXT
, message_type TEXT
, content TEXT
, file_key TEXT
, file_name TEXT
, file_url TEXT
, send_status TEXT
, sensitive_word TEXT
, create_time TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.unique_id IS ' 消息唯一标识';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.main_unique_id IS '在线客服会话ID';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.sender_id IS '消息发送方ID';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.sender_name IS '消息发送方名称';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.sender_type IS '消息发送方类型';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.message_type IS '消息类型';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.content IS '消息内容';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.file_key IS '文件的地址';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.file_name IS '文件名称';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.file_url IS '文件访问路径';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.send_status IS '消息发送状态';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.sensitive_word IS '消息内容包含敏感词';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.create_time IS '消息创建时间戳';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_messages.etl_tx_dt IS '';
COMMENT ON TABLE p12_sfull.S98_S_tr_chat_messages IS '会话消息详情';

View File

@ -0,0 +1,214 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
delete from p10_sa.S98_S_tr_chat_records
;
insert into p10_sa.S98_S_tr_chat_records
( main_unique_id
, contact_type
, app_id
, app_name
, visitor_id
, visitor_name
, robot_id
, robot_name
, robot_start_time
, robot_end_time
, qno
, qname
, cno
, client_name
, chat_duration_type
, close_status
, close_reason
, open_type
, first_response_duration
, start_time
, end_time
, queue_time
, join_queue_time
, bridge_time
, session_type
, chat_valid_session
, comment_session
, invitation_initiator
, keys
, remark
, ip
, customer_region
, chat_times
, invited_times
, search_word
, operating_system
, browser
, referer_name
, referer_url
, first_visit_page_url
, initiation_page_url
, visit_tracks
, visitor_extra_info
, device_type
, app_type
, province
, city
, referrer_name
, etl_tx_dt )
select
main_unique_id
, contact_type
, app_id
, app_name
, visitor_id
, visitor_name
, robot_id
, robot_name
, robot_start_time
, robot_end_time
, qno
, qname
, cno
, client_name
, chat_duration_type
, close_status
, close_reason
, open_type
, first_response_duration
, start_time
, end_time
, queue_time
, join_queue_time
, bridge_time
, session_type
, chat_valid_session
, comment_session
, invitation_initiator
, keys
, remark
, ip
, customer_region
, chat_times
, invited_times
, search_word
, operating_system
, browser
, referer_name
, referer_url
, first_visit_page_url
, initiation_page_url
, visit_tracks
, visitor_extra_info
, device_type
, app_type
, province
, city
, referrer_name
, etl_tx_dt
from p00_tal.S98_S_tr_chat_records
;
delete from p12_sfull.S98_S_tr_chat_records where main_unique_id in (select main_unique_id from p10_sa.S98_S_tr_chat_records)
;
;
insert into p12_sfull.S98_S_tr_chat_records
( main_unique_id
, contact_type
, app_id
, app_name
, visitor_id
, visitor_name
, robot_id
, robot_name
, robot_start_time
, robot_end_time
, qno
, qname
, cno
, client_name
, chat_duration_type
, close_status
, close_reason
, open_type
, first_response_duration
, start_time
, end_time
, queue_time
, join_queue_time
, bridge_time
, session_type
, chat_valid_session
, comment_session
, invitation_initiator
, keys
, remark
, ip
, customer_region
, chat_times
, invited_times
, search_word
, operating_system
, browser
, referer_name
, referer_url
, first_visit_page_url
, initiation_page_url
, visit_tracks
, visitor_extra_info
, device_type
, app_type
, province
, city
, referrer_name
, etl_tx_dt )
select
main_unique_id
, contact_type
, app_id
, app_name
, visitor_id
, visitor_name
, robot_id
, robot_name
, robot_start_time
, robot_end_time
, qno
, qname
, cno
, client_name
, chat_duration_type
, close_status
, close_reason
, open_type
, first_response_duration
, start_time
, end_time
, queue_time
, join_queue_time
, bridge_time
, session_type
, chat_valid_session
, comment_session
, invitation_initiator
, keys
, remark
, ip
, customer_region
, chat_times
, invited_times
, search_word
, operating_system
, browser
, referer_name
, referer_url
, first_visit_page_url
, initiation_page_url
, visit_tracks
, visitor_extra_info
, device_type
, app_type
, province
, city
, referrer_name
, etl_tx_dt
from p10_sa.S98_S_tr_chat_records
;
\q

View File

@ -0,0 +1,60 @@
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_tr_chat_records (
main_unique_id TEXT
, contact_type TEXT
, app_id TEXT
, app_name TEXT
, visitor_id TEXT
, visitor_name TEXT
, robot_id TEXT
, robot_name TEXT
, robot_start_time TEXT
, robot_end_time TEXT
, qno TEXT
, qname TEXT
, cno TEXT
, client_name TEXT
, chat_duration_type TEXT
, close_status TEXT
, close_reason TEXT
, open_type TEXT
, first_response_duration TEXT
, start_time TEXT
, end_time TEXT
, queue_time TEXT
, join_queue_time TEXT
, bridge_time TEXT
, session_type TEXT
, chat_valid_session TEXT
, comment_session TEXT
, invitation_initiator TEXT
, keys TEXT
, remark TEXT
, ip TEXT
, customer_region TEXT
, chat_times TEXT
, invited_times TEXT
, search_word TEXT
, operating_system TEXT
, browser TEXT
, referer_name TEXT
, referer_url TEXT
, first_visit_page_url TEXT
, initiation_page_url TEXT
, visit_tracks TEXT
, visitor_extra_info TEXT
, device_type TEXT
, app_type TEXT
, province TEXT
, city TEXT
, referrer_name TEXT
, etl_tx_dt TIMESTAMP
)
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'tr_chat_records' );

View File

@ -0,0 +1,211 @@
create table if not exists p10_sa.S98_S_tr_chat_records (
main_unique_id TEXT
, contact_type TEXT
, app_id TEXT
, app_name TEXT
, visitor_id TEXT
, visitor_name TEXT
, robot_id TEXT
, robot_name TEXT
, robot_start_time TEXT
, robot_end_time TEXT
, qno TEXT
, qname TEXT
, cno TEXT
, client_name TEXT
, chat_duration_type TEXT
, close_status TEXT
, close_reason TEXT
, open_type TEXT
, first_response_duration TEXT
, start_time TEXT
, end_time TEXT
, queue_time TEXT
, join_queue_time TEXT
, bridge_time TEXT
, session_type TEXT
, chat_valid_session TEXT
, comment_session TEXT
, invitation_initiator TEXT
, keys TEXT
, remark TEXT
, ip TEXT
, customer_region TEXT
, chat_times TEXT
, invited_times TEXT
, search_word TEXT
, operating_system TEXT
, browser TEXT
, referer_name TEXT
, referer_url TEXT
, first_visit_page_url TEXT
, initiation_page_url TEXT
, visit_tracks TEXT
, visitor_extra_info TEXT
, device_type TEXT
, app_type TEXT
, province TEXT
, city TEXT
, referrer_name TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.main_unique_id IS '在线客服会话ID';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.contact_type IS '接入渠道';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.app_id IS '接入号ID';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.app_name IS '接入号名称';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.visitor_id IS '访客ID';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.visitor_name IS '访客姓名';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.robot_id IS '机器人客服ID';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.robot_name IS '机器人客服名称';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.robot_start_time IS '机器人客服接入会话时间戳';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.robot_end_time IS '机器人客服结束会话时间戳';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.qno IS '接入队列号';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.qname IS '队列名称';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.cno IS '接待座席号';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.client_name IS '座席名称';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.chat_duration_type IS '接待类型';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.close_status IS '会话结束时状态';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.close_reason IS '会话结束原因';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.open_type IS '会话发起方式';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.first_response_duration IS '座席首次响应时长';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.start_time IS '会话开始时间戳';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.end_time IS '会话结束时间戳';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.queue_time IS '排队时长';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.join_queue_time IS '接入队列时间戳';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.bridge_time IS '座席接入时间戳';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.session_type IS '会话是否有人工客服接入';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.chat_valid_session IS '是否是有效会话';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.comment_session IS '是否是通过留言发起的会话';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.invitation_initiator IS '满意度评价发起方式';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.keys IS '满意度评价';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.remark IS '满意度评价备注';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.ip IS '访客IP';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.customer_region IS '访问地区';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.chat_times IS '会话次数';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.invited_times IS '被邀请次数';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.search_word IS '搜索词,通过对来源页解析,获取搜索参数参数。';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.operating_system IS '操作系统';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.browser IS '浏览器型号';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.referer_name IS '访客来源';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.referer_url IS '来源页';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.first_visit_page_url IS '着陆页';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.initiation_page_url IS '会话发起页';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.visit_tracks IS '浏览轨迹';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.visitor_extra_info IS '访客自定义参数';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.device_type IS '访客设备类型';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.app_type IS '接入渠道类型';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.province IS '访客会话时所在省份';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.city IS '访客会话时所在城市';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.referrer_name IS '访客来源';
COMMENT ON COLUMN p10_sa.S98_S_tr_chat_records.etl_tx_dt IS '';
COMMENT ON TABLE p10_sa.S98_S_tr_chat_records IS '会话记录列表';
create table if not exists p12_sfull.S98_S_tr_chat_records (
main_unique_id TEXT
, contact_type TEXT
, app_id TEXT
, app_name TEXT
, visitor_id TEXT
, visitor_name TEXT
, robot_id TEXT
, robot_name TEXT
, robot_start_time TEXT
, robot_end_time TEXT
, qno TEXT
, qname TEXT
, cno TEXT
, client_name TEXT
, chat_duration_type TEXT
, close_status TEXT
, close_reason TEXT
, open_type TEXT
, first_response_duration TEXT
, start_time TEXT
, end_time TEXT
, queue_time TEXT
, join_queue_time TEXT
, bridge_time TEXT
, session_type TEXT
, chat_valid_session TEXT
, comment_session TEXT
, invitation_initiator TEXT
, keys TEXT
, remark TEXT
, ip TEXT
, customer_region TEXT
, chat_times TEXT
, invited_times TEXT
, search_word TEXT
, operating_system TEXT
, browser TEXT
, referer_name TEXT
, referer_url TEXT
, first_visit_page_url TEXT
, initiation_page_url TEXT
, visit_tracks TEXT
, visitor_extra_info TEXT
, device_type TEXT
, app_type TEXT
, province TEXT
, city TEXT
, referrer_name TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.main_unique_id IS '在线客服会话ID';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.contact_type IS '接入渠道';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.app_id IS '接入号ID';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.app_name IS '接入号名称';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.visitor_id IS '访客ID';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.visitor_name IS '访客姓名';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.robot_id IS '机器人客服ID';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.robot_name IS '机器人客服名称';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.robot_start_time IS '机器人客服接入会话时间戳';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.robot_end_time IS '机器人客服结束会话时间戳';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.qno IS '接入队列号';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.qname IS '队列名称';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.cno IS '接待座席号';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.client_name IS '座席名称';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.chat_duration_type IS '接待类型';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.close_status IS '会话结束时状态';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.close_reason IS '会话结束原因';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.open_type IS '会话发起方式';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.first_response_duration IS '座席首次响应时长';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.start_time IS '会话开始时间戳';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.end_time IS '会话结束时间戳';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.queue_time IS '排队时长';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.join_queue_time IS '接入队列时间戳';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.bridge_time IS '座席接入时间戳';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.session_type IS '会话是否有人工客服接入';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.chat_valid_session IS '是否是有效会话';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.comment_session IS '是否是通过留言发起的会话';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.invitation_initiator IS '满意度评价发起方式';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.keys IS '满意度评价';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.remark IS '满意度评价备注';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.ip IS '访客IP';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.customer_region IS '访问地区';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.chat_times IS '会话次数';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.invited_times IS '被邀请次数';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.search_word IS '搜索词,通过对来源页解析,获取搜索参数参数。';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.operating_system IS '操作系统';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.browser IS '浏览器型号';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.referer_name IS '访客来源';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.referer_url IS '来源页';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.first_visit_page_url IS '着陆页';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.initiation_page_url IS '会话发起页';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.visit_tracks IS '浏览轨迹';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.visitor_extra_info IS '访客自定义参数';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.device_type IS '访客设备类型';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.app_type IS '接入渠道类型';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.province IS '访客会话时所在省份';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.city IS '访客会话时所在城市';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.referrer_name IS '访客来源';
COMMENT ON COLUMN p12_sfull.S98_S_tr_chat_records.etl_tx_dt IS '';
COMMENT ON TABLE p12_sfull.S98_S_tr_chat_records IS '会话记录列表';

View File

@ -0,0 +1,81 @@
#!/usr/bin/python
# -*- encoding=utf-8 -*-
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import json
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
sshHook = SSHHook(ssh_conn_id ='ssh_air')
default_args = {
'owner': 'info@idgvalue.com',
'email_on_failure': True,
'email_on_retry':True,
'start_date': datetime(2024, 1, 1),
'depends_on_past': False,
'retries': 6,
'retry_delay': timedelta(minutes=10),
}
dag = DAG('wf_dag_smart_ccc_chat', default_args=default_args,
schedule_interval="0 0-23/1 * * *",
catchup=False,
dagrun_timeout=timedelta(minutes=160),
max_active_runs=3)
task_failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="task_failed",
to=["info@idgvalue.com"],
cc=[""],
subject="smart_ccc_chat_failed",
html_content='<h3>您好smart_ccc_chat作业失败请及时处理" </h3>')
chat_records_feign = SSHOperator(
ssh_hook=sshHook,
task_id='chat_records_feign',
command='python3 /data/airflow/etl/API/chat_records_feign.py',
depends_on_past=False,
retries=3,
dag=dag)
chat_records_load = SSHOperator(
ssh_hook=sshHook,
task_id='chat_records_load',
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
params={'my_param':"chat_records_load"},
depends_on_past=False,
retries=3,
dag=dag)
chat_records_feign >> chat_records_load
tr_chat_messages_4800 = SSHOperator(
ssh_hook=sshHook,
task_id='tr_chat_messages_4800',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"S98_S_tr_chat_messages"},
depends_on_past=False,
retries=3,
dag=dag)
tr_chat_records_2337 = SSHOperator(
ssh_hook=sshHook,
task_id='tr_chat_records_2337',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"S98_S_tr_chat_records"},
depends_on_past=False,
retries=3,
dag=dag)
chat_records_load >> tr_chat_records_2337
chat_records_load >> tr_chat_messages_4800
tr_chat_messages_4800 >> task_failed
tr_chat_records_2337 >> task_failed

View File

@ -0,0 +1,204 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
import time
import hashlib
import time
import hmac
import base64
import urllib.parse
import hashlib
from collections import OrderedDict
from urllib.parse import quote_plus
#全局变量,便于参数使用的预设值
current_date = datetime.date.today() # 获取当前日期
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
timestamp = time.time() # 为Unix time即从"1970-01-01 00:00:00"至今的秒数;
sign_version = 'v2' # 签名版本号固定值v2
nonce = str(uuid.uuid4())
current_time_utc =( datetime.datetime.now() - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%SZ")
formatted2_current_date = current_date.strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期 - 标准化
formatted2_previous_date = previous_date.strftime("%Y-%m-%d %H:%M:%S") # 获取前一天日期 - 标准化
def formatted2_previous_hour(h):
if h==0:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
def previous_hour_timestamp(h):
if h==0:
return int(time.time())
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return int(start_of_previous_hour.timestamp())
#计算签名
def generate_signature(str, private_key):
signature = hmac.new(private_key.encode(), (str).encode(), hashlib.sha1)
signature_b64 = base64.b64encode(signature.digest()).decode()
return signature_b64
#构建查询链接
def build_query_string(params):
# 使用OrderedDict来保持排序
sorted_params = OrderedDict(sorted(params.items()))
# 拼接属性名和属性值,并使用&连接
query_string = '&'.join('{}={}'.format(
urllib.parse.quote_plus(k),
urllib.parse.quote_plus(str(v))
) for k, v in sorted_params.items())
return query_string
def request_data_signature_get():
print(f'开始请求会话记录数据')
url='https://api-bj.clink.cn/livechat/copy_chat_records'
param={'Timestamp':current_time_utc,'Expires':86400,'date':formatted2_previous_date,'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177',}
print(f'param: {param}')
url_path = build_query_string(param)
url_param = url_path
print(f'url_param: {url_param}')
url_param = f'GETapi-bj.clink.cn/livechat/copy_chat_records?{url_param}'
print(f'待计算字符串: {url_param}')
signature= generate_signature(url_param,'5g027B6w06630Y5240c1')
print(f'计算签名: {signature}')
print(f'编码后签名: {urllib.parse.quote_plus(signature)}')
url = f'{url}?{url_path}&Signature={urllib.parse.quote_plus(signature)}'
print(f'url: {url}')
dataReqL=requests.get(url,headers={},params={})
resText = dataReqL.text
i = 0
while 'error' in resText and i < 5:
print(f'请求会话记录失败,再次请求第{i+1}')
time.sleep(1)
dataReqL=requests.get(url,headers={},params={})
resText = dataReqL.text
i = i + 1
resD=json.loads(resText)
return resD
def request_detail_signature_get(id):
print(f'开始请求会话详情数据:{id}')
url='https://api-bj.clink.cn/livechat/list_chat_messages'
param={'Timestamp':current_time_utc,'Expires':86400,'mainUniqueId':id,'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177'}
print(f'param: {param}')
url_path = build_query_string(param)
url_param = url_path
print(f'url_param: {url_param}')
url_param = f'GETapi-bj.clink.cn/livechat/list_chat_messages?{url_param}'
print(f'待计算字符串: {url_param}')
signature= generate_signature(url_param,'5g027B6w06630Y5240c1')
print(f'计算签名: {signature}')
print(f'编码后签名: {urllib.parse.quote_plus(signature)}')
url = f'{url}?{url_path}&Signature={urllib.parse.quote_plus(signature)}'
print(f'url: {url}')
dataReqL=requests.get(url,headers={},params={})
resText = dataReqL.text
i = 0
while 'error' in resText and i < 5:
print(f'请求会话记录失败,再次请求第{i+1}')
time.sleep(1)
dataReqL=requests.get(url,headers={},params={})
resText = dataReqL.text
i = i + 1
resD=json.loads(resText)
return resD
def load_data_to_db(dataList):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
total=len(dataList)
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = 'f31fc54503de4ceda87ec9b3c99e4d0e';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'f31fc54503de4ceda87ec9b3c99e4d0e', json_object, total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束chat_records:查询会话记录列表')
def load_detail_data_to_db(ids, dataList):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
total=len(dataList)
print('临时id'+dataId)
json_object = json.dumps(dataList)
idstr = ','.join(ids)
cur=conn.cursor()
sql="update data_api.cc_details_ids_exp set is_loaded = '1' where api_id = '211b765fe3e04082a9f83a29e3b0c37c' and id in (%s); INSERT INTO data_api.cc_message_details (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[idstr,dataId,'211b765fe3e04082a9f83a29e3b0c37c', json_object, total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束message_detail:获取会话详情')
def load_detail_exp_to_db(id):
try:
print(f'添加查询工单异常记录:{id}')
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
cur=conn.cursor()
sql=" INSERT INTO data_api.cc_details_ids_exp (id,api_id,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[id, '211b765fe3e04082a9f83a29e3b0c37c'])
conn.commit()
cur.close()
conn.close()
print(f'添加查询工单异常记录:{id} 结束')
except Exception as e:
print(f'添加查询工单异常记录:{id}失败, 错误信息:{e}')
if __name__ == "__main__":
print(f'{formatted2_previous_hour(0)}开始请求会话信息')
resL = request_data_signature_get()
print(resL)
if 'error' in resL:
error = resL['error']
print(f'请求会话列表失败,失败原因:{error}')
else:
dataList = resL['records']
load_data_to_db(dataList)
detailDataList = []
ids = []
for data in dataList:
try:
id = data['mainUniqueId']
resD = request_detail_signature_get(id)
#print(f"请求工单详情结束")
if 'records' in resD:
ids.append(id)
dataList = resD['records']
detailDataList.append(dataList)
else:
error = resD['error']
print(f"请求会话详情id:{id})失败,错误信息:{error}")
load_detail_exp_to_db(id)
except Exception as e:
print(f'请求会话详情id:{id})异常, )异常信息:{e}')
load_detail_exp_to_db(data['id'])
if len(ids) > 0:
ids_str = [str(item) for item in ids]
load_detail_data_to_db(ids_str,detailDataList)
print(f'{formatted2_previous_hour(0)}请求会话信息结束')

View File

@ -0,0 +1,168 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.tr_chat_records;
insert into data_api.tr_chat_records (
main_unique_id
, contact_type
, app_id
, app_name
, visitor_id
, visitor_name
, robot_id
, robot_name
, robot_start_time
, robot_end_time
, qno
, qname
, cno
, client_name
, chat_duration_type
, close_status
, close_reason
, open_type
, first_response_duration
, start_time
, end_time
, queue_time
, join_queue_time
, bridge_time
, session_type
, chat_valid_session
, comment_session
, invitation_initiator
, keys
, remark
, ip
, customer_region
, chat_times
, invited_times
, search_word
, operating_system
, browser
, referer_name
, referer_url
, first_visit_page_url
, initiation_page_url
, visit_tracks
, visitor_extra_info
, device_type
, app_type
, province
, city
, referrer_name
,etl_tx_dt
)
select
case when trim(both from main_unique_id)='' then null else main_unique_id::text end main_unique_id
, case when trim(both from contact_type)='' then null else contact_type::text end contact_type
, case when trim(both from app_id)='' then null else app_id::text end app_id
, case when trim(both from app_name)='' then null else app_name::text end app_name
, case when trim(both from visitor_id)='' then null else visitor_id::text end visitor_id
, case when trim(both from visitor_name)='' then null else visitor_name::text end visitor_name
, case when trim(both from robot_id)='' then null else robot_id::text end robot_id
, case when trim(both from robot_name)='' then null else robot_name::text end robot_name
, case when trim(both from robot_start_time)='' then null else robot_start_time::text end robot_start_time
, case when trim(both from robot_end_time)='' then null else robot_end_time::text end robot_end_time
, case when trim(both from qno)='' then null else qno::text end qno
, case when trim(both from qname)='' then null else qname::text end qname
, case when trim(both from cno)='' then null else cno::text end cno
, case when trim(both from client_name)='' then null else client_name::text end client_name
, case when trim(both from chat_duration_type)='' then null else chat_duration_type::text end chat_duration_type
, case when trim(both from close_status)='' then null else close_status::text end close_status
, case when trim(both from close_reason)='' then null else close_reason::text end close_reason
, case when trim(both from open_type)='' then null else open_type::text end open_type
, case when trim(both from first_response_duration)='' then null else first_response_duration::text end first_response_duration
, case when trim(both from start_time)='' then null else start_time::text end start_time
, case when trim(both from end_time)='' then null else end_time::text end end_time
, case when trim(both from queue_time)='' then null else queue_time::text end queue_time
, case when trim(both from join_queue_time)='' then null else join_queue_time::text end join_queue_time
, case when trim(both from bridge_time)='' then null else bridge_time::text end bridge_time
, case when trim(both from session_type)='' then null else session_type::text end session_type
, case when trim(both from chat_valid_session)='' then null else chat_valid_session::text end chat_valid_session
, case when trim(both from comment_session)='' then null else comment_session::text end comment_session
, case when trim(both from invitation_initiator)='' then null else invitation_initiator::text end invitation_initiator
, case when trim(both from keys)='' then null else keys::text end keys
, case when trim(both from remark)='' then null else remark::text end remark
, case when trim(both from ip)='' then null else ip::text end ip
, case when trim(both from customer_region)='' then null else customer_region::text end customer_region
, case when trim(both from chat_times)='' then null else chat_times::text end chat_times
, case when trim(both from invited_times)='' then null else invited_times::text end invited_times
, case when trim(both from search_word)='' then null else search_word::text end search_word
, case when trim(both from operating_system)='' then null else operating_system::text end operating_system
, case when trim(both from browser)='' then null else browser::text end browser
, case when trim(both from referer_name)='' then null else referer_name::text end referer_name
, case when trim(both from referer_url)='' then null else referer_url::text end referer_url
, case when trim(both from first_visit_page_url)='' then null else first_visit_page_url::text end first_visit_page_url
, case when trim(both from initiation_page_url)='' then null else initiation_page_url::text end initiation_page_url
, case when trim(both from visit_tracks)='' then null else visit_tracks::text end visit_tracks
, case when trim(both from visitor_extra_info)='' then null else visitor_extra_info::text end visitor_extra_info
, case when trim(both from device_type)='' then null else device_type::text end device_type
, case when trim(both from app_type)='' then null else app_type::text end app_type
, case when trim(both from province)='' then null else province::text end province
, case when trim(both from city)='' then null else city::text end city
, case when trim(both from referrer_name)='' then null else referrer_name::text end referrer_name
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'mainUniqueId') main_unique_id
, (json_array_elements(data::json)::json->>'contactType') contact_type
, (json_array_elements(data::json)::json->>'appId') app_id
, (json_array_elements(data::json)::json->>'appName') app_name
, (json_array_elements(data::json)::json->>'visitorId') visitor_id
, (json_array_elements(data::json)::json->>'visitorName') visitor_name
, (json_array_elements(data::json)::json->>'robotId') robot_id
, (json_array_elements(data::json)::json->>'robotName') robot_name
, (json_array_elements(data::json)::json->>'robotStartTime') robot_start_time
, (json_array_elements(data::json)::json->>'robotEndTime') robot_end_time
, (json_array_elements(data::json)::json->>'qno') qno
, (json_array_elements(data::json)::json->>'qname') qname
, (json_array_elements(data::json)::json->>'cno') cno
, (json_array_elements(data::json)::json->>'clientName') client_name
, (json_array_elements(data::json)::json->>'chatDurationType') chat_duration_type
, (json_array_elements(data::json)::json->>'closeStatus') close_status
, (json_array_elements(data::json)::json->>'closeReason') close_reason
, (json_array_elements(data::json)::json->>'openType') open_type
, (json_array_elements(data::json)::json->>'firstResponseDuration') first_response_duration
, (json_array_elements(data::json)::json->>'startTime') start_time
, (json_array_elements(data::json)::json->>'endTime') end_time
, (json_array_elements(data::json)::json->>'queueTime') queue_time
, (json_array_elements(data::json)::json->>'joinQueueTime') join_queue_time
, (json_array_elements(data::json)::json->>'bridgeTime') bridge_time
, (json_array_elements(data::json)::json->>'sessionType') session_type
, (json_array_elements(data::json)::json->>'chatValidSession') chat_valid_session
, (json_array_elements(data::json)::json->>'commentSession') comment_session
, (json_array_elements(data::json)::json->>'invitationInitiator') invitation_initiator
, (json_array_elements(data::json)::json->>'keys') keys
, (json_array_elements(data::json)::json->>'remark') remark
, (json_array_elements(data::json)::json->>'ip') ip
, (json_array_elements(data::json)::json->>'customerRegion') customer_region
, (json_array_elements(data::json)::json->>'chatTimes') chat_times
, (json_array_elements(data::json)::json->>'invitedTimes') invited_times
, (json_array_elements(data::json)::json->>'searchWord') search_word
, (json_array_elements(data::json)::json->>'operatingSystem') operating_system
, (json_array_elements(data::json)::json->>'browser') browser
, (json_array_elements(data::json)::json->>'refererName') referer_name
, (json_array_elements(data::json)::json->>'refererUrl') referer_url
, (json_array_elements(data::json)::json->>'firstVisitPageUrl') first_visit_page_url
, (json_array_elements(data::json)::json->>'initiationPageUrl') initiation_page_url
, (json_array_elements(data::json)::json->>'visitTracks') visit_tracks
, (json_array_elements(data::json)::json->>'visitorExtraInfo') visitor_extra_info
, (json_array_elements(data::json)::json->>'deviceType') device_type
, (json_array_elements(data::json)::json->>'appType') app_type
, (json_array_elements(data::json)::json->>'province') province
, (json_array_elements(data::json)::json->>'city') city
, (json_array_elements(data::json)::json->>'referrerName') referrer_name
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='f31fc54503de4ceda87ec9b3c99e4d0e' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='f31fc54503de4ceda87ec9b3c99e4d0e';
\q

View File

@ -0,0 +1,121 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
import time
import hashlib
import time
import hmac
import base64
import urllib.parse
import hashlib
from collections import OrderedDict
from urllib.parse import quote_plus
#全局变量,便于参数使用的预设值
current_date = datetime.date.today() # 获取当前日期
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
timestamp = time.time() # 为Unix time即从"1970-01-01 00:00:00"至今的秒数;
sign_version = 'v2' # 签名版本号固定值v2
nonce = str(uuid.uuid4())
current_time_utc =( datetime.datetime.now() - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%SZ")
formatted2_current_date = current_date.strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期 - 标准化
formatted2_previous_date = previous_date.strftime("%Y-%m-%d %H:%M:%S") # 获取前一天日期 - 标准化
def formatted2_previous_hour(h):
if h==0:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
def previous_hour_timestamp(h):
if h==0:
return int(time.time())
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return int(start_of_previous_hour.timestamp())
#计算签名
def generate_signature(str, private_key):
signature = hmac.new(private_key.encode(), (str).encode(), hashlib.sha1)
signature_b64 = base64.b64encode(signature.digest()).decode()
return signature_b64
#构建查询链接
def build_query_string(params):
# 使用OrderedDict来保持排序
sorted_params = OrderedDict(sorted(params.items()))
# 拼接属性名和属性值,并使用&连接
query_string = '&'.join('{}={}'.format(
urllib.parse.quote_plus(k),
urllib.parse.quote_plus(str(v))
) for k, v in sorted_params.items())
return query_string
#计算签名get请求
def request_data_signature_get():
print('开始请求数据...')
url='https://api-bj.clink.cn/livechat/list_chat_messages'
param={'mainUniqueId':'1','AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','Timestamp':current_time_utc,'Expires':Expires,}
print(f'param: {param}')
paramJson = {"mainUniqueId":"1","Timestamp":""}
print(f'paramJson: {paramJson}')
url_path = build_query_string(param)
url_param = build_query_string(param)
print(f'url_param: {url_param}')
url_param = f'GETapi-bj.clink.cn/livechat/list_chat_messages?{url_param}'
print(f'待计算字符串: {url_param}')
signature= generate_signature(url_param,'5g027B6w06630Y5240c1')
print(f'计算签名: {signature}')
print(f'编码后签名: {urllib.parse.quote_plus(signature)}')
url = f'{url}?{url_path}&Signature={urllib.parse.quote_plus(signature)}'
print(f'url: {url}')
dataReqL=requests.get(url,headers={},params={})
i = 0
while 'error' in dataReqL and i < 5:
time.sleep(1)
dataReqL=requests.get(url,headers={},params={})
i = i + 1
resL=json.loads(dataReqL.text)
return resL
def load_data_to_db(dataList):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
print('数据库连接成功')
dataId=str(uuid.uuid4())
total=len(dataList)
print('临时id'+dataId)
json_object = json.dumps(dataList)
cur=conn.cursor()
sql="update data_api.api_data set is_loaded = '1' where api_id = '211b765fe3e04082a9f83a29e3b0c37c';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
cur.execute(sql,[dataId,'211b765fe3e04082a9f83a29e3b0c37c', json_object, total])
conn.commit()
cur.close()
conn.close()
print('加载数据结束chat_messages:查询消息记录列表')
if __name__ == "__main__":
resL = request_data_signature_get()
print(resL)
if 'error' in resL:
load_error_to_db(resl)
load_data_to_db(resL[''records'])

View File

@ -0,0 +1,63 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM data_api.tr_chat_messages;
insert into data_api.tr_chat_messages (
unique_id
, main_unique_id
, sender_id
, sender_name
, sender_type
, message_type
, content
, file_key
, file_name
, file_url
, send_status
, sensitive_word
, create_time
,etl_tx_dt
)
select
case when trim(both from unique_id)='' then null else unique_id::text end unique_id
, case when trim(both from main_unique_id)='' then null else main_unique_id::text end main_unique_id
, case when trim(both from sender_id)='' then null else sender_id::text end sender_id
, case when trim(both from sender_name)='' then null else sender_name::text end sender_name
, case when trim(both from sender_type)='' then null else sender_type::text end sender_type
, case when trim(both from message_type)='' then null else message_type::text end message_type
, case when trim(both from content)='' then null else content::text end content
, case when trim(both from file_key)='' then null else file_key::text end file_key
, case when trim(both from file_name)='' then null else file_name::text end file_name
, case when trim(both from file_url)='' then null else file_url::text end file_url
, case when trim(both from send_status)='' then null else send_status::text end send_status
, case when trim(both from sensitive_word)='' then null else sensitive_word::text end sensitive_word
, case when trim(both from create_time)='' then null else create_time::text end create_time
,etl_tx_dt
from (
select
(json_array_elements(data::json)::json->>'uniqueId') unique_id
, (json_array_elements(data::json)::json->>'mainUniqueId') main_unique_id
, (json_array_elements(data::json)::json->>'senderId') sender_id
, (json_array_elements(data::json)::json->>'senderName') sender_name
, (json_array_elements(data::json)::json->>'senderType') sender_type
, (json_array_elements(data::json)::json->>'messageType') message_type
, (json_array_elements(data::json)::json->>'content') content
, (json_array_elements(data::json)::json->>'fileKey') file_key
, (json_array_elements(data::json)::json->>'fileName') file_name
, (json_array_elements(data::json)::json->>'fileUrl') file_url
, (json_array_elements(data::json)::json->>'sendStatus') send_status
, (json_array_elements(data::json)::json->>'sensitiveWord') sensitive_word
, (json_array_elements(data::json)::json->>'createTime') create_time
,CURRENT_TIMESTAMP(0) etl_tx_dt
from (select * from data_api.api_data
WHERE api_id='211b765fe3e04082a9f83a29e3b0c37c' and is_loaded = '0' order by request_tm desc limit 1) p )p;
update data_api.api_data
set is_loaded = '1' ,
status = '1',
request_tm = current_timestamp(0)
where api_id='211b765fe3e04082a9f83a29e3b0c37c';
\q