add workflow 天润Smart-ccc会话数据,dev

This commit is contained in:
root 2024-08-30 15:41:28 +08:00
parent 6d1d390af1
commit 7f47f0c678
1 changed files with 62 additions and 38 deletions

View File

@ -26,6 +26,18 @@ current_time_utc =( datetime.datetime.now() - datetime.timedelta(hours=8)).strft
formatted2_current_date = current_date.strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期 - 标准化
# database="dataops_db"
# user="dbuser_dba"
# password="EmBRxnmmjnE3"
# host="124.221.232.219"
# port="5432"
database="dataops_db"
user="dbuser_dops"
password="MIgTi3jA"
host="172.17.0.8"
port="5432"
def formatted2_previous_date(d):
if d==0:
return datetime.date.today().strftime("%Y%m%d")
@ -68,14 +80,15 @@ def build_query_string(params):
) for k, v in sorted_params.items())
return query_string
def request_data_signature_get(scrollId):
""" 查询record 列表 """
def request_data_signature_get(scrollId,d):
print(f'开始请求会话记录数据')
url='https://api-bj.clink.cn/livechat/copy_chat_records'
if scrollId is None:
param={'Timestamp':current_time_utc,'Expires':86400,'date':formatted2_previous_date(0),'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100}
param={'Timestamp':current_time_utc,'Expires':86400,'date':formatted2_previous_date(d),'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100}
# param={'Timestamp':current_time_utc,'Expires':86400,'date':'20240801','AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100}
else:
param={'Timestamp':current_time_utc,'Expires':86400,'date':formatted2_previous_date(0),'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100,'scrollId':scrollId}
param={'Timestamp':current_time_utc,'Expires':86400,'date':formatted2_previous_date(d),'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100,'scrollId':scrollId}
# param={'Timestamp':current_time_utc,'Expires':86400,'date':'20240801','AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100,'scrollId':scrollId}
print(f'param: {param}')
url_path = build_query_string(param)
@ -102,6 +115,7 @@ def request_data_signature_get(scrollId):
print(dataReqL)
return resD
""" 查询消息详情 """
def request_detail_signature_get(id):
print(f'开始请求会话详情数据:{id}')
url='https://api-bj.clink.cn/livechat/list_chat_messages'
@ -121,21 +135,31 @@ def request_detail_signature_get(id):
dataReqL=requests.get(url,headers={},params={})
resText = dataReqL.text
i = 0
while 'error' in resText and i < 5:
print(f'请求会话详情失败,再次请求第{i+1}')
time.sleep(1)
dataReqL=requests.get(url,headers={},params={})
resText = dataReqL.text
i = i + 1
if 'records' not in resText:
while 'error' in resText and i < 5:
print(f'请求会话详情失败,再次请求第{i+1}')
time.sleep(1)
dataReqL=requests.get(url,headers={},params={})
resText = dataReqL.text
i = i + 1
resD=json.loads(resText)
if i==5 and 'error' in resText:
print(f'请求会话详情失败,异常信息:{resD}')
return resD
def get_data_from_db():
conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
cur=conn.cursor()
sql = "select * from data_api.tr_chat_record_ids"
cur.execute(sql)
records = cur.fetchall()
conn.commit()
cur.close()
conn.close()
return records
def load_data_to_db(dataList):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
print('数据库连接成功')
dataId=str(uuid.uuid4())
total=len(dataList)
@ -150,7 +174,7 @@ def load_data_to_db(dataList):
print('加载数据结束chat_records:查询会话记录列表')
def load_detail_data_to_db(ids, dataList):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
print('数据库连接成功')
dataId=str(uuid.uuid4())
total=len(dataList)
@ -168,7 +192,7 @@ def load_detail_data_to_db(ids, dataList):
def load_detail_exp_to_db(id):
try:
print(f'添加消息异常记录:{id}')
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432")
conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
print('数据库连接成功')
dataId=str(uuid.uuid4())
print('临时id'+dataId)
@ -181,14 +205,11 @@ def load_detail_exp_to_db(id):
print(f'添加消息异常记录:{id} 结束')
except Exception as e:
print(f'添加消息异常记录:{id}失败, 错误信息:{e}')
if __name__ == "__main__":
print(f'{formatted2_previous_hour(0)}开始请求会话信息')
hour_delta = 1
previous_time = previous_hour_timestamp(hour_delta)
resL = request_data_signature_get(None)
def fetch_records_all(d):
resL = request_data_signature_get(None,d)
# print(resL)
list = []
if 'error' in resL:
error = resL['error']
print(f'请求会话列表失败,失败原因:{error}')
@ -199,8 +220,8 @@ if __name__ == "__main__":
i = 1
while scrollId is not None:
i = i+1
resN = request_data_signature_get(scrollId)
if 'error' in resN:
resN = request_data_signature_get(scrollId,d)
if 'records' not in resN:
error = resL['error']
print(f'请求会话列表失败,失败原因:{error}')
break
@ -212,32 +233,37 @@ if __name__ == "__main__":
if len(nextList) < 100:
break
print(f'records会话记录数为{len(list)}, 共请求{i}次会话')
# load_data_to_db(list)
return list
if __name__ == "__main__":
print(f'{formatted2_previous_hour(0)}开始请求会话信息')
hour_delta = 1
previous_time = previous_hour_timestamp(hour_delta)
list = fetch_records_all(0)
if datetime.datetime.now().hour < 3:
pre_list = fetch_records_all(1)
list = list + pre_list
load_data_to_db(list)
records = get_data_from_db()
if len(records) > 0:
print(f'此处需处理{len(records)}条绘画信息信息')
detailDataList = []
actList = []
ids = []
j = 0
k = 0
l = 0
for data in list:
for data in records:
try:
id = data['mainUniqueId']
startTime = data['startTime']
endTime = data['endTime']
print(f'对比时间:{previous_time},数据开始时间:{startTime},会话结束时间:{endTime}')
if startTime < previous_time and endTime < previous_time:
print(f'该id不在采集{hour_delta}个小时内,不做采集{id}')
continue
actList.append(data)
id = data[0]
j = j + 1
print(f'{j}. 开始请求会话详情数据:{id}')
resD = request_detail_signature_get(id)
#print(f"请求工单详情结束")
if 'records' in resD:
k = k + 1
ids.append(id)
dataList = resD['records']
# detailDataList.append(dataList)
detailDataList = detailDataList+dataList
print(f'{j}. deltail: {len(dataList)}')
else:
@ -249,10 +275,8 @@ if __name__ == "__main__":
print(f'请求会话详情id:{id})异常, )异常信息:{e}')
load_detail_exp_to_db(data['id'])
print(f'{j}. exp end')
print(f'实际加载{j}-{len(actList)}{len(detailDataList)} - {k}-{l} 条记录')
load_data_to_db(actList)
if len(ids) > 0:
ids_str = [str(item) for item in ids]
# print(f'会话详情{ids_str}')
load_detail_data_to_db(ids_str,detailDataList)
print(f'{formatted2_previous_hour(0)}请求会话信息结束')
print(f'实际加载{j}-{len(records)}{len(detailDataList)} - {k}-{l} 条记录')
print(f'{formatted2_previous_hour(0)}请求会话信息结束')