From aea9d4c40ef57a8cf42369e10a0a12830fe80787 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 8 Dec 2025 16:45:50 +0800 Subject: [PATCH] =?UTF-8?q?add=20workflow=20Append-=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E8=A1=A5=E5=85=A8,dev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../156_account_complete_append.py | 154 ++++++------------ 1 file changed, 47 insertions(+), 107 deletions(-) diff --git a/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py b/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py index 6182c05..0a3cb57 100644 --- a/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py +++ b/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py @@ -5,8 +5,6 @@ from psycopg2.extras import RealDictCursor import time from typing import List, Dict, Any import logging -from concurrent.futures import ThreadPoolExecutor, as_completed -import threading # 配置日志 logging.basicConfig( @@ -34,41 +32,30 @@ class APIClient: Returns: API响应数据 """ - payload = { + payload = json.dumps({ "inputs": inputs, "response_mode": "blocking", "user": "admin" - } - logger.info(f"调用API,payload: {json.dumps(payload, ensure_ascii=False)}") + }) try: logger.info("调用带inputs参数的API") response = requests.post( self.api_url, headers=self.headers, - json=payload, # Using json parameter instead of data to let requests handle serialization - timeout=300 # Reduced timeout to more reasonable value + data=payload, + timeout=1200 ) - logger.info(f"API调用完成,状态码: {response.status_code}") - logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response - response.raise_for_status() logger.info(f"API调用成功,状态码: {response.status_code}") return { "success": True, "status_code": response.status_code, - "data": response.json() if response.content else {} + "data": response.json() } - except requests.exceptions.HTTPError as e: - logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") - return { - "success": False, - "error": f"HTTP {e.response.status_code}: {e.response.text}", - "status_code": e.response.status_code - } except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {e}") return { @@ -76,7 +63,6 @@ class APIClient: "error": str(e) } - def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: """直接调用不带inputs参数的API @@ -102,7 +88,7 @@ class APIClient: self.api_url, headers=self.headers, data=json.dumps(payload), - timeout=300 + timeout=1200 ) response.raise_for_status() @@ -144,7 +130,7 @@ class APIClient: self.api_url, headers=self.headers, data=payload, - timeout=300 + timeout=2400 ) response.raise_for_status() @@ -237,8 +223,8 @@ class DatabaseManager: self.connection.close() logger.info("数据库连接已关闭") - def get_urls_from_database(self, query: str = None) -> List[Dict[str, Any]]: - """从数据库获取数据列表""" + def get_urls_from_database(self, query: str = None) -> List[str]: + """从数据库获取URL列表""" if not self.connection: self.connect() @@ -251,60 +237,17 @@ class DatabaseManager: cursor.execute(query) results = cursor.fetchall() - # 直接返回字典列表,每个字典包含 id 和 address - return [dict(row) for row in results] + # 处理结果,返回字符串列表 + datas = [list(row.values())[0] for row in results] + return datas except Exception as e: logger.error(f"查询数据库失败: {e}") return [] - -def process_single_item(api_client, item, type, item_index, total_count): - """处理单个项目""" - logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item}") - - try: - if type == 'workflow': - # item 是字典,包含 id 和 address - inputs_data = { - 'company1': f"{item.get('company1', '')}", - 'address1': f"{item.get('address1', '')}", - 'city1':f"{item.get('city1', '')}", - 'company2': f"{item.get('company2', '')}", - 'address2': f"{item.get('address2', '')}", - 'city2':f"{item.get('city2', '')}" - } - - result = api_client.call_api_with_inputs(inputs_data) - - if result['success']: - logger.info(f"数据 {item} 处理成功") - else: - logger.error(f"数据 {item} 处理失败: {result.get('error')}") - return result - - else: - #agent - # 将字典转换为字符串查询 - query = f"ID: {item.get('id', '')}, 地址: {item.get('address', '')}" - result = api_client.call_api_with_query(query) - - if result['success']: - logger.info(f"数据 {item} 处理成功") - else: - logger.error(f"数据 {item} 处理失败: {result.get('error')}") - return result - - except Exception as e: - logger.error(f"处理数据 {item} 时发生异常: {e}") - return {"success": False, "error": str(e)} - #DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp #DATABASE_SCHEMA=p70_ai_intelligence def main(): """主函数""" - # 配置并发数 - MAX_WORKERS = 10 # 可调整为5或10 - # 数据库配置 db_config = { 'host': '124.221.232.219', @@ -332,8 +275,7 @@ def main(): if flag: # 初始化 db_manager = DatabaseManager(db_config) - custom_query = """ -SELECT + custom_query = """ SELECT t2.name as company1, t2.city_name as city1, t2.address_detail as address1, @@ -345,11 +287,11 @@ t1.address as address2 FROM p70_ai_intelligence.agent_account_info t1 left join p30_common.v_sql_cleaned_cn_d_account_info t2 on t2."name" = t1."search" - where t1.city <> t2.city_name + where ((t1.city not like t2.city_name||'%' and t2.city_name <>'') or t1.account not like t2.name||'%') and t2.name not in (select search from p70_ai_intelligence.agent_account_info_append) - limit 100 -""" + + """ try: # 从数据库获取URL列表 @@ -359,39 +301,37 @@ t1.address as address2 logger.warning("未获取到URL") return - # 展平所有数据项为单个列表 - # all_items = [] - # for batch in list: - # items = batch.split('\n') - # all_items.extend(items) - - logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程") - - # 使用线程池并发处理 - success_count = 0 - failed_count = 0 - - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - # 提交所有任务 - future_to_item = { - executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item - for i, item in enumerate(list) - } + # 遍历每个URL调用API + for i, text in enumerate(list, 1): + logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") - # 收集结果 - for future in as_completed(future_to_item): - item = future_to_item[future] - try: - result = future.result() - if result.get("success"): - success_count += 1 - else: - failed_count += 1 - except Exception as e: - logger.error(f"处理项目 {item} 时发生异常: {e}") - failed_count += 1 - - logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}") + if type is 'workflow': + # 方法1: 使用带inputs参数的调用 + inputs_data = { + "urls": f"{text}" + } + + result = api_client.call_api_with_inputs(inputs_data) + + if result['success']: + logger.info(f"URL {text} 处理成功") + else: + logger.error(f"URL {text} 处理失败: {result.get('error')}") + + + else: + #agent + query = text + result = api_client.call_api_with_query(query) + + if result['success']: + logger.info(f"URL {text} 处理成功") + else: + logger.error(f"URL {text} 处理失败: {result.get('error')}") + + # 可选:添加延迟避免请求过于频繁 + if i < len(text): + time.sleep(1) except Exception as e: logger.error(f"程序执行失败: {e}") @@ -401,7 +341,7 @@ t1.address as address2 else: logger.info("调用不带inputs参数的API示例") - if type == 'workflow': + if type is 'workflow': result2 = api_client.call_api_without_inputs() if result2['success']: