diff --git a/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py b/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py index 0a3cb57..ac9eded 100644 --- a/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py +++ b/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py @@ -5,6 +5,8 @@ from psycopg2.extras import RealDictCursor import time from typing import List, Dict, Any import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading # 配置日志 logging.basicConfig( @@ -32,30 +34,41 @@ class APIClient: Returns: API响应数据 """ - payload = json.dumps({ + payload = { "inputs": inputs, "response_mode": "blocking", "user": "admin" - }) + } + logger.info(f"调用API,payload: {json.dumps(payload, ensure_ascii=False)}") try: logger.info("调用带inputs参数的API") response = requests.post( self.api_url, headers=self.headers, - data=payload, - timeout=1200 + json=payload, # Using json parameter instead of data to let requests handle serialization + timeout=300 # Reduced timeout to more reasonable value ) + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + response.raise_for_status() logger.info(f"API调用成功,状态码: {response.status_code}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": response.json() if response.content else {} } + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {e}") return { @@ -63,6 +76,7 @@ class APIClient: "error": str(e) } + def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: """直接调用不带inputs参数的API @@ -88,7 +102,7 @@ class APIClient: self.api_url, headers=self.headers, data=json.dumps(payload), - timeout=1200 + timeout=300 ) response.raise_for_status() @@ -130,7 +144,7 @@ class APIClient: self.api_url, headers=self.headers, data=payload, - timeout=2400 + timeout=300 ) response.raise_for_status() @@ -223,8 +237,8 @@ class DatabaseManager: self.connection.close() logger.info("数据库连接已关闭") - def get_urls_from_database(self, query: str = None) -> List[str]: - """从数据库获取URL列表""" + def get_urls_from_database(self, query: str = None) -> List[Dict[str, Any]]: + """从数据库获取数据列表""" if not self.connection: self.connect() @@ -237,17 +251,60 @@ class DatabaseManager: cursor.execute(query) results = cursor.fetchall() - # 处理结果,返回字符串列表 - datas = [list(row.values())[0] for row in results] - return datas + # 直接返回字典列表,每个字典包含 id 和 address + return [dict(row) for row in results] except Exception as e: logger.error(f"查询数据库失败: {e}") return [] + +def process_single_item(api_client, item, type, item_index, total_count): + """处理单个项目""" + logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item}") + + try: + if type == 'workflow': + # item 是字典,包含 id 和 address + inputs_data = { + 'company1': f"{item.get('company1', '')}", + 'address1': f"{item.get('address1', '')}", + 'city1':f"{item.get('city1', '')}", + 'company2': f"{item.get('company2', '')}", + 'address2': f"{item.get('address2', '')}", + 'city2':f"{item.get('city2', '')}" + } + + result = api_client.call_api_with_inputs(inputs_data) + + if result['success']: + logger.info(f"数据 {item} 处理成功") + else: + logger.error(f"数据 {item} 处理失败: {result.get('error')}") + return result + + else: + #agent + # 将字典转换为字符串查询 + query = f"ID: {item.get('id', '')}, 地址: {item.get('address', '')}" + result = api_client.call_api_with_query(query) + + if result['success']: + logger.info(f"数据 {item} 处理成功") + else: + logger.error(f"数据 {item} 处理失败: {result.get('error')}") + return result + + except Exception as e: + logger.error(f"处理数据 {item} 时发生异常: {e}") + return {"success": False, "error": str(e)} + #DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp #DATABASE_SCHEMA=p70_ai_intelligence def main(): """主函数""" + # 配置并发数 + MAX_WORKERS = 10 # 可调整为5或10 + # 数据库配置 db_config = { 'host': '124.221.232.219', @@ -275,7 +332,8 @@ def main(): if flag: # 初始化 db_manager = DatabaseManager(db_config) - custom_query = """ SELECT + custom_query = """ +SELECT t2.name as company1, t2.city_name as city1, t2.address_detail as address1, @@ -289,9 +347,9 @@ t1.address as address2 on t2."name" = t1."search" where ((t1.city not like t2.city_name||'%' and t2.city_name <>'') or t1.account not like t2.name||'%') and t2.name not in (select search from p70_ai_intelligence.agent_account_info_append) + limit 100 - - """ +""" try: # 从数据库获取URL列表 @@ -301,37 +359,39 @@ t1.address as address2 logger.warning("未获取到URL") return - # 遍历每个URL调用API - for i, text in enumerate(list, 1): - logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") + # 展平所有数据项为单个列表 + # all_items = [] + # for batch in list: + # items = batch.split('\n') + # all_items.extend(items) + + logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程") + + # 使用线程池并发处理 + success_count = 0 + failed_count = 0 + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # 提交所有任务 + future_to_item = { + executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item + for i, item in enumerate(list) + } - if type is 'workflow': - # 方法1: 使用带inputs参数的调用 - inputs_data = { - "urls": f"{text}" - } - - result = api_client.call_api_with_inputs(inputs_data) - - if result['success']: - logger.info(f"URL {text} 处理成功") - else: - logger.error(f"URL {text} 处理失败: {result.get('error')}") - - - else: - #agent - query = text - result = api_client.call_api_with_query(query) - - if result['success']: - logger.info(f"URL {text} 处理成功") - else: - logger.error(f"URL {text} 处理失败: {result.get('error')}") - - # 可选:添加延迟避免请求过于频繁 - if i < len(text): - time.sleep(1) + # 收集结果 + for future in as_completed(future_to_item): + item = future_to_item[future] + try: + result = future.result() + if result.get("success"): + success_count += 1 + else: + failed_count += 1 + except Exception as e: + logger.error(f"处理项目 {item} 时发生异常: {e}") + failed_count += 1 + + logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}") except Exception as e: logger.error(f"程序执行失败: {e}") @@ -341,7 +401,7 @@ t1.address as address2 else: logger.info("调用不带inputs参数的API示例") - if type is 'workflow': + if type == 'workflow': result2 = api_client.call_api_without_inputs() if result2['success']: