From 9722349fd7974a77fbd2f276281acfa920653fde Mon Sep 17 00:00:00 2001 From: root Date: Sat, 6 Dec 2025 14:24:03 +0800 Subject: [PATCH] =?UTF-8?q?add=20workflow=20=E5=AE=A2=E6=88=B7=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E8=A1=A5=E5=85=A8=E3=80=90156,dev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../account_info_complete.py | 138 +++++++++++++----- 1 file changed, 103 insertions(+), 35 deletions(-) diff --git a/dev/workflow/TK_Cust/account_info_complete/【156】客户基础信息补全/account_info_complete.py b/dev/workflow/TK_Cust/account_info_complete/【156】客户基础信息补全/account_info_complete.py index cc8aaa4..14f5ca6 100644 --- a/dev/workflow/TK_Cust/account_info_complete/【156】客户基础信息补全/account_info_complete.py +++ b/dev/workflow/TK_Cust/account_info_complete/【156】客户基础信息补全/account_info_complete.py @@ -5,6 +5,8 @@ from psycopg2.extras import RealDictCursor import time from typing import List, Dict, Any import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading # 配置日志 logging.basicConfig( @@ -37,6 +39,7 @@ class APIClient: "response_mode": "blocking", "user": "admin" }) + logger.info(f"调用API,payload: {payload}") try: logger.info("调用带inputs参数的API") @@ -44,7 +47,7 @@ class APIClient: self.api_url, headers=self.headers, data=payload, - timeout=1200 + timeout=2400 ) response.raise_for_status() @@ -63,6 +66,7 @@ class APIClient: "error": str(e) } + def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: """直接调用不带inputs参数的API @@ -244,10 +248,48 @@ class DatabaseManager: except Exception as e: logger.error(f"查询数据库失败: {e}") return [] + +def process_single_item(api_client, item_text, type, item_index, total_count): + """处理单个项目""" + logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item_text}") + + try: + if type == 'workflow': + # 方法1: 使用带inputs参数的调用 + inputs_data = { + 'companys': f"{item_text}" + } + + result = api_client.call_api_with_inputs(inputs_data) + + if result['success']: + logger.info(f"数据 {item_text} 处理成功") + else: + logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") + return result + + else: + #agent + query = item_text + result = api_client.call_api_with_query(query) + + if result['success']: + logger.info(f"数据 {item_text} 处理成功") + else: + logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") + return result + + except Exception as e: + logger.error(f"处理数据 {item_text} 时发生异常: {e}") + return {"success": False, "error": str(e)} + #DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp #DATABASE_SCHEMA=p70_ai_intelligence def main(): """主函数""" + # 配置并发数 + MAX_WORKERS = 10 # 可调整为5或10 + # 数据库配置 db_config = { 'host': '124.221.232.219', @@ -260,8 +302,8 @@ def main(): # API配置 api_config = { - 'url': 'https://tk-agent.idgvalue.com/v1/workflows/run', - 'auth_token': 'Bearer app-pv9rLM4ukTRayP2YD60cS4k5' + 'url': 'https://agent.idgvalue.com/v1/workflows/run', + 'auth_token': 'Bearer app-AXKHfJwq6SdZrRMmULh8xONU' } api_client = APIClient(api_config['url'], api_config['auth_token']) @@ -275,7 +317,31 @@ def main(): if flag: # 初始化 db_manager = DatabaseManager(db_config) - custom_query = """ ${custom_query} """ + custom_query = """ +WITH numbered_names AS ( + SELECT + name, + (ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /10 as batch_num + FROM p30_common.v_sql_cleaned_cn_d_account_info + WHERE name NOT IN ( + SELECT "search" + FROM p70_ai_intelligence.agent_account_info + WHERE "search" IS NOT NULL + ) + and name NOT IN ( + SELECT name + FROM p70_ai_intelligence.agent_execp_account + WHERE name IS NOT NULL + ) + limit 100 +) +SELECT + STRING_AGG(name, E'\n') as batched_names +FROM numbered_names +GROUP BY batch_num +ORDER BY batch_num; + +""" try: # 从数据库获取URL列表 @@ -285,37 +351,39 @@ def main(): logger.warning("未获取到URL") return - # 遍历每个URL调用API - for i, text in enumerate(list, 1): - logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") + # 展平所有数据项为单个列表 + # all_items = [] + # for batch in list: + # items = batch.split('\n') + # all_items.extend(items) + + logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程") + + # 使用线程池并发处理 + success_count = 0 + failed_count = 0 + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # 提交所有任务 + future_to_item = { + executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item + for i, item in enumerate(list) + } - if type is 'workflow': - # 方法1: 使用带inputs参数的调用 - inputs_data = { - "urls": f"{text}" - } - - result = api_client.call_api_with_inputs(inputs_data) - - if result['success']: - logger.info(f"URL {text} 处理成功") - else: - logger.error(f"URL {text} 处理失败: {result.get('error')}") - - - else: - #agent - query = text - result = api_client.call_api_with_query(query) - - if result['success']: - logger.info(f"URL {text} 处理成功") - else: - logger.error(f"URL {text} 处理失败: {result.get('error')}") - - # 可选:添加延迟避免请求过于频繁 - if i < len(text): - time.sleep(1) + # 收集结果 + for future in as_completed(future_to_item): + item = future_to_item[future] + try: + result = future.result() + if result.get("success"): + success_count += 1 + else: + failed_count += 1 + except Exception as e: + logger.error(f"处理项目 {item} 时发生异常: {e}") + failed_count += 1 + + logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}") except Exception as e: logger.error(f"程序执行失败: {e}") @@ -325,7 +393,7 @@ def main(): else: logger.info("调用不带inputs参数的API示例") - if type is 'workflow': + if type == 'workflow': result2 = api_client.call_api_without_inputs() if result2['success']: