diff --git a/dev/workflow/TK_Cust/smart_ccc_chat/查询会话记录列表/chat_records_feign.py b/dev/workflow/TK_Cust/smart_ccc_chat/查询会话记录列表/chat_records_feign.py index cdb69d2..5bc7722 100644 --- a/dev/workflow/TK_Cust/smart_ccc_chat/查询会话记录列表/chat_records_feign.py +++ b/dev/workflow/TK_Cust/smart_ccc_chat/查询会话记录列表/chat_records_feign.py @@ -26,6 +26,18 @@ current_time_utc =( datetime.datetime.now() - datetime.timedelta(hours=8)).strft formatted2_current_date = current_date.strftime("%Y-%m-%d %H:%M:%S") # 获取当前日期 - 标准化 +# database="dataops_db" +# user="dbuser_dba" +# password="EmBRxnmmjnE3" +# host="124.221.232.219" +# port="5432" + +database="dataops_db" +user="dbuser_dops" +password="MIgTi3jA" +host="172.17.0.8" +port="5432" + def formatted2_previous_date(d): if d==0: return datetime.date.today().strftime("%Y%m%d") @@ -68,14 +80,15 @@ def build_query_string(params): ) for k, v in sorted_params.items()) return query_string -def request_data_signature_get(scrollId): +""" 查询record 列表 """ +def request_data_signature_get(scrollId,d): print(f'开始请求会话记录数据') url='https://api-bj.clink.cn/livechat/copy_chat_records' if scrollId is None: - param={'Timestamp':current_time_utc,'Expires':86400,'date':formatted2_previous_date(0),'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100} + param={'Timestamp':current_time_utc,'Expires':86400,'date':formatted2_previous_date(d),'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100} # param={'Timestamp':current_time_utc,'Expires':86400,'date':'20240801','AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100} else: - param={'Timestamp':current_time_utc,'Expires':86400,'date':formatted2_previous_date(0),'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100,'scrollId':scrollId} + param={'Timestamp':current_time_utc,'Expires':86400,'date':formatted2_previous_date(d),'AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100,'scrollId':scrollId} # param={'Timestamp':current_time_utc,'Expires':86400,'date':'20240801','AccessKeyId':'b17759d3a36fba9a2cf522fbf4cbf177','limit':100,'scrollId':scrollId} print(f'param: {param}') url_path = build_query_string(param) @@ -102,6 +115,7 @@ def request_data_signature_get(scrollId): print(dataReqL) return resD +""" 查询消息详情 """ def request_detail_signature_get(id): print(f'开始请求会话详情数据:{id}') url='https://api-bj.clink.cn/livechat/list_chat_messages' @@ -121,21 +135,31 @@ def request_detail_signature_get(id): dataReqL=requests.get(url,headers={},params={}) resText = dataReqL.text i = 0 - while 'error' in resText and i < 5: - print(f'请求会话详情失败,再次请求第{i+1}次') - time.sleep(1) - dataReqL=requests.get(url,headers={},params={}) - resText = dataReqL.text - i = i + 1 + if 'records' not in resText: + while 'error' in resText and i < 5: + print(f'请求会话详情失败,再次请求第{i+1}次') + time.sleep(1) + dataReqL=requests.get(url,headers={},params={}) + resText = dataReqL.text + i = i + 1 resD=json.loads(resText) if i==5 and 'error' in resText: print(f'请求会话详情失败,异常信息:{resD}') return resD - +def get_data_from_db(): + conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) + cur=conn.cursor() + sql = "select * from data_api.tr_chat_record_ids" + cur.execute(sql) + records = cur.fetchall() + conn.commit() + cur.close() + conn.close() + return records def load_data_to_db(dataList): - conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432") + conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) print('数据库连接成功') dataId=str(uuid.uuid4()) total=len(dataList) @@ -150,7 +174,7 @@ def load_data_to_db(dataList): print('加载数据结束:chat_records:查询会话记录列表') def load_detail_data_to_db(ids, dataList): - conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432") + conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) print('数据库连接成功') dataId=str(uuid.uuid4()) total=len(dataList) @@ -168,7 +192,7 @@ def load_detail_data_to_db(ids, dataList): def load_detail_exp_to_db(id): try: print(f'添加消息异常记录:{id}') - conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",host="172.17.0.8", port="5432") + conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) print('数据库连接成功') dataId=str(uuid.uuid4()) print('临时id:'+dataId) @@ -181,14 +205,11 @@ def load_detail_exp_to_db(id): print(f'添加消息异常记录:{id} 结束') except Exception as e: print(f'添加消息异常记录:{id}失败, 错误信息:{e}') - -if __name__ == "__main__": - print(f'{formatted2_previous_hour(0)}开始请求会话信息') - hour_delta = 1 - previous_time = previous_hour_timestamp(hour_delta) - resL = request_data_signature_get(None) +def fetch_records_all(d): + resL = request_data_signature_get(None,d) # print(resL) + list = [] if 'error' in resL: error = resL['error'] print(f'请求会话列表失败,失败原因:{error}') @@ -199,8 +220,8 @@ if __name__ == "__main__": i = 1 while scrollId is not None: i = i+1 - resN = request_data_signature_get(scrollId) - if 'error' in resN: + resN = request_data_signature_get(scrollId,d) + if 'records' not in resN: error = resL['error'] print(f'请求会话列表失败,失败原因:{error}') break @@ -212,32 +233,37 @@ if __name__ == "__main__": if len(nextList) < 100: break print(f'records会话记录数为:{len(list)}, 共请求{i}次会话') - # load_data_to_db(list) + return list + + +if __name__ == "__main__": + print(f'{formatted2_previous_hour(0)}开始请求会话信息') + hour_delta = 1 + previous_time = previous_hour_timestamp(hour_delta) + list = fetch_records_all(0) + + if datetime.datetime.now().hour < 3: + pre_list = fetch_records_all(1) + list = list + pre_list + load_data_to_db(list) + records = get_data_from_db() + if len(records) > 0: + print(f'此处需处理{len(records)}条绘画信息信息') detailDataList = [] - actList = [] ids = [] j = 0 k = 0 l = 0 - for data in list: + for data in records: try: - id = data['mainUniqueId'] - startTime = data['startTime'] - endTime = data['endTime'] - print(f'对比时间:{previous_time},数据开始时间:{startTime},会话结束时间:{endTime}') - if startTime < previous_time and endTime < previous_time: - print(f'该id不在采集{hour_delta}个小时内,不做采集{id}') - continue - actList.append(data) + id = data[0] j = j + 1 print(f'{j}. 开始请求会话详情数据:{id}') resD = request_detail_signature_get(id) - #print(f"请求工单详情结束") if 'records' in resD: k = k + 1 ids.append(id) dataList = resD['records'] - # detailDataList.append(dataList) detailDataList = detailDataList+dataList print(f'{j}. deltail: {len(dataList)}') else: @@ -249,10 +275,8 @@ if __name__ == "__main__": print(f'请求会话详情(id:{id})异常, )异常信息:{e}') load_detail_exp_to_db(data['id']) print(f'{j}. exp end') - print(f'实际加载{j}-{len(actList)},{len(detailDataList)} - {k}-{l} 条记录') - load_data_to_db(actList) if len(ids) > 0: ids_str = [str(item) for item in ids] - # print(f'会话详情{ids_str}') load_detail_data_to_db(ids_str,detailDataList) - print(f'{formatted2_previous_hour(0)}请求会话信息结束') \ No newline at end of file + print(f'实际加载{j}-{len(records)},{len(detailDataList)} - {k}-{l} 条记录') + print(f'{formatted2_previous_hour(0)}请求会话信息结束') \ No newline at end of file