From 30ec909d9e4903827632ce526c19ee455e67fdd0 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 16 Dec 2025 14:58:43 +0800 Subject: [PATCH] =?UTF-8?q?add=20workflow=20=E5=B8=82=E5=9C=BANews-Agents?= =?UTF-8?q?=E8=B0=83=E5=BA=A6,dev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../泰克产品相关性分析/tk_product_prefer.py | 194 ++++++++++++++---- 1 file changed, 157 insertions(+), 37 deletions(-) diff --git a/dev/workflow/TK_Cust/agents_market_newsletter/泰克产品相关性分析/tk_product_prefer.py b/dev/workflow/TK_Cust/agents_market_newsletter/泰克产品相关性分析/tk_product_prefer.py index 8bccc31..7afc8b9 100644 --- a/dev/workflow/TK_Cust/agents_market_newsletter/泰克产品相关性分析/tk_product_prefer.py +++ b/dev/workflow/TK_Cust/agents_market_newsletter/泰克产品相关性分析/tk_product_prefer.py @@ -5,6 +5,8 @@ from psycopg2.extras import RealDictCursor import time from typing import List, Dict, Any import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading # 配置日志 logging.basicConfig( @@ -39,29 +41,53 @@ class APIClient: "user": "admin" }) + start_time = time.time() try: - logger.info("调用带inputs参数的API") + logger.info(f"调用带inputs参数的API: {self.api_url},{inputs}") response = requests.post( self.api_url, headers=self.headers, data=payload, - timeout=2400 + timeout=2400, + stream=True # 流式响应需要设置stream=True ) response.raise_for_status() - logger.info(f"API调用成功,状态码: {response.status_code}") + elapsed_time = time.time() - start_time + + # 正确处理流式响应 + full_response = "" + for line in response.iter_lines(): + if line: + decoded_line = line.decode('utf-8') + if decoded_line.startswith("data: "): + json_str = decoded_line[6:] # 移除 "data: " 前缀 + if json_str != "[DONE]": + try: + chunk_data = json.loads(json_str) + if 'answer' in chunk_data: + full_response += chunk_data['answer'] + except json.JSONDecodeError: + pass # 忽略解析错误 + + logger.info(f"API调用成功,状态码: {response.status_code}, 耗时: {elapsed_time:.2f}秒, 接口: {self.api_url}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": {"answer": full_response}, + "server": self.api_url, + "elapsed_time": elapsed_time } except requests.exceptions.RequestException as e: - logger.error(f"API调用失败: {e}") + elapsed_time = time.time() - start_time + logger.error(f"API调用失败: {e}, 耗时: {elapsed_time:.2f}秒, 接口: {self.api_url}") return { "success": False, - "error": str(e) + "error": str(e), + "server": self.api_url, + "elapsed_time": elapsed_time } def call_api_without_inputs(self, other_params: Dict[str, Any] = None, query:str = None) -> Dict[str, Any]: @@ -84,8 +110,9 @@ class APIClient: if other_params: payload.update(other_params) + start_time = time.time() try: - logger.info("调用不带inputs参数的API") + logger.info(f"调用不带inputs参数的API: {self.api_url}") response = requests.post( self.api_url, headers=self.headers, @@ -94,19 +121,25 @@ class APIClient: ) response.raise_for_status() - logger.info(f"API调用成功,状态码: {response.status_code}") + elapsed_time = time.time() - start_time + logger.info(f"API调用成功,状态码: {response.status_code}, 耗时: {elapsed_time:.2f}秒, 接口: {self.api_url}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": response.json(), + "server": self.api_url, + "elapsed_time": elapsed_time } except requests.exceptions.RequestException as e: - logger.error(f"API调用失败: {e}") + elapsed_time = time.time() - start_time + logger.error(f"API调用失败: {e}, 耗时: {elapsed_time:.2f}秒, 接口: {self.api_url}") return { "success": False, - "error": str(e) + "error": str(e), + "server": self.api_url, + "elapsed_time": elapsed_time } class DatabaseManager: @@ -161,6 +194,76 @@ class DatabaseManager: return [] #DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp #DATABASE_SCHEMA=p70_ai_intelligence +class LoadBalancer: + """负载均衡器类""" + + def __init__(self, servers: List[Dict[str, str]]): + self.servers = servers + self.index = 0 + self.lock = threading.Lock() + + def get_next_server(self) -> Dict[str, str]: + """轮询方式获取下一个服务器""" + with self.lock: + server = self.servers[self.index] + self.index = (self.index + 1) % len(self.servers) + return server + +class MultiAPIClient: + """多API客户端类,支持负载均衡和并发调用""" + + def __init__(self, servers: List[Dict[str, str]], max_workers: int = 5): + self.load_balancer = LoadBalancer(servers) + self.max_workers = max_workers + + def call_api_with_inputs(self, inputs: Dict[str, Any] = None, query: str = None) -> Dict[str, Any]: + """通过负载均衡选择服务器并调用API""" + server = self.load_balancer.get_next_server() + client = APIClient(server['url'], server['auth_token']) + return client.call_api_with_inputs(inputs, query) + + def process_batch(self, items: List[str]) -> List[Dict[str, Any]]: + """并发处理一批数据""" + results = [] + start_time = time.time() + + logger.info(f"开始并发处理 {len(items)} 个数据项,最大并发数: {self.max_workers}") + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # 提交所有任务 + future_to_item = { + executor.submit(self._process_single_item, item): item + for item in items + } + + # 收集结果 + for future in as_completed(future_to_item): + item = future_to_item[future] + try: + result = future.result() + results.append(result) + if result.get('success'): + logger.info(f"完成处理: {item}, 使用服务器: {result.get('server')}, 耗时: {result.get('elapsed_time', 0):.2f}秒") + else: + logger.error(f"处理失败: {item}, 使用服务器: {result.get('server')}, 错误: {result.get('error')}") + except Exception as e: + logger.error(f"处理 {item} 时发生异常: {e}") + results.append({ + "item": item, + "success": False, + "error": str(e) + }) + + total_elapsed_time = time.time() - start_time + logger.info(f"批量处理完成,总共处理: {len(results)} 项,成功: {sum(1 for r in results if r.get('success', False))} 项,总耗时: {total_elapsed_time:.2f}秒") + + return results + + def _process_single_item(self, item: str) -> Dict[str, Any]: + """处理单个数据项""" + inputs_data = {} + return self.call_api_with_inputs(inputs_data, item) + def main(): """主函数""" # 数据库配置 @@ -173,13 +276,36 @@ def main(): 'schema': 'p70_ai_intelligence' } - # API配置 - api_config = { - 'url': 'https://agent.idgvalue.com/v1/chat-messages', - 'auth_token': 'Bearer app-yAPAzkPbAaV8l1SuZBjXHTU4' - } - - api_client = APIClient(api_config['url'], api_config['auth_token']) + # 多个API服务器配置 + api_servers = [ + # 可以在这里添加更多服务器 + { + 'url': 'https://agent1.idgvalue.com/v1/chat-messages', + 'auth_token': 'Bearer app-99g3fpcZcbtNHdPIt5RmKbOJ' + }, + { + 'url': 'https://agent2.idgvalue.com/v1/chat-messages', + 'auth_token': 'Bearer app-iU3FP9Vs1oLTXmKY0q8kqRZy' + }, + + { + 'url': 'https://agent3.idgvalue.com/v1/chat-messages', + 'auth_token': 'Bearer app-uZtUYnhux8ubLFhum3bpW9Kc' + }, + + { + 'url': 'https://agent.idgvalue.com/v1/chat-messages', + 'auth_token': 'Bearer app-yAPAzkPbAaV8l1SuZBjXHTU4' + }, + { + 'url': 'https://agent1.idgvalue.com/v1/chat-messages', + 'auth_token': 'Bearer app-6HIbbN1vV4oerlKAwd99KXh6' + }, + { + 'url': 'https://agent2.idgvalue.com/v1/chat-messages', + 'auth_token': 'Bearer app-WG4dQBxfqpI2r6Zha2csymp7' + }, + ] try: flag = True @@ -194,30 +320,22 @@ def main(): try: # 从数据库获取URL列表 - list = db_manager.get_urls_from_database(custom_query) + list_items = db_manager.get_urls_from_database(custom_query) - if not list: + if not list_items: logger.warning("未获取到URL") return - # 遍历每个URL调用API - for i, text in enumerate(list, 1): - logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") + # 创建多API客户端,支持负载均衡和并发调用 + multi_api_client = MultiAPIClient(api_servers, max_workers=4) + + # 并发处理所有数据 + logger.info(f"开始处理 {len(list_items)} 个数据项") + results = multi_api_client.process_batch(list_items) + + success_count = sum(1 for r in results if r.get('success', False)) + logger.info(f"处理完成,成功: {success_count}/{len(results)}") - # 方法1: 使用带inputs参数的调用 - inputs_data = {} - query = text - - result = api_client.call_api_with_inputs(inputs_data,query) - - if result['success']: - logger.info(f"URL {text} 处理成功") - else: - logger.error(f"URL {text} 处理失败: {result.get('error')}") - - # 可选:添加延迟避免请求过于频繁 - if i < len(text): - time.sleep(1) except Exception as e: logger.error(f"程序执行失败: {e}") @@ -225,6 +343,8 @@ def main(): db_manager.disconnect() else: + # 单个API客户端示例 + api_client = APIClient(api_servers[0]['url'], api_servers[0]['auth_token']) logger.info("调用不带inputs参数的API示例") result2 = api_client.call_api_without_inputs()