add workflow 市场News-Agents调度,dev

This commit is contained in:
root 2025-12-16 14:58:43 +08:00
parent 42b59b70b4
commit 30ec909d9e
1 changed files with 157 additions and 37 deletions

View File

@ -5,6 +5,8 @@ from psycopg2.extras import RealDictCursor
import time
from typing import List, Dict, Any
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# 配置日志
logging.basicConfig(
@ -39,29 +41,53 @@ class APIClient:
"user": "admin"
})
start_time = time.time()
try:
logger.info("调用带inputs参数的API")
logger.info(f"调用带inputs参数的API: {self.api_url},{inputs}")
response = requests.post(
self.api_url,
headers=self.headers,
data=payload,
timeout=2400
timeout=2400,
stream=True # 流式响应需要设置stream=True
)
response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}")
elapsed_time = time.time() - start_time
# 正确处理流式响应
full_response = ""
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith("data: "):
json_str = decoded_line[6:] # 移除 "data: " 前缀
if json_str != "[DONE]":
try:
chunk_data = json.loads(json_str)
if 'answer' in chunk_data:
full_response += chunk_data['answer']
except json.JSONDecodeError:
pass # 忽略解析错误
logger.info(f"API调用成功状态码: {response.status_code}, 耗时: {elapsed_time:.2f}秒, 接口: {self.api_url}")
return {
"success": True,
"status_code": response.status_code,
"data": response.json()
"data": {"answer": full_response},
"server": self.api_url,
"elapsed_time": elapsed_time
}
except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}")
elapsed_time = time.time() - start_time
logger.error(f"API调用失败: {e}, 耗时: {elapsed_time:.2f}秒, 接口: {self.api_url}")
return {
"success": False,
"error": str(e)
"error": str(e),
"server": self.api_url,
"elapsed_time": elapsed_time
}
def call_api_without_inputs(self, other_params: Dict[str, Any] = None, query:str = None) -> Dict[str, Any]:
@ -84,8 +110,9 @@ class APIClient:
if other_params:
payload.update(other_params)
start_time = time.time()
try:
logger.info("调用不带inputs参数的API")
logger.info(f"调用不带inputs参数的API: {self.api_url}")
response = requests.post(
self.api_url,
headers=self.headers,
@ -94,19 +121,25 @@ class APIClient:
)
response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}")
elapsed_time = time.time() - start_time
logger.info(f"API调用成功状态码: {response.status_code}, 耗时: {elapsed_time:.2f}秒, 接口: {self.api_url}")
return {
"success": True,
"status_code": response.status_code,
"data": response.json()
"data": response.json(),
"server": self.api_url,
"elapsed_time": elapsed_time
}
except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}")
elapsed_time = time.time() - start_time
logger.error(f"API调用失败: {e}, 耗时: {elapsed_time:.2f}秒, 接口: {self.api_url}")
return {
"success": False,
"error": str(e)
"error": str(e),
"server": self.api_url,
"elapsed_time": elapsed_time
}
class DatabaseManager:
@ -161,6 +194,76 @@ class DatabaseManager:
return []
#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
#DATABASE_SCHEMA=p70_ai_intelligence
class LoadBalancer:
"""负载均衡器类"""
def __init__(self, servers: List[Dict[str, str]]):
self.servers = servers
self.index = 0
self.lock = threading.Lock()
def get_next_server(self) -> Dict[str, str]:
"""轮询方式获取下一个服务器"""
with self.lock:
server = self.servers[self.index]
self.index = (self.index + 1) % len(self.servers)
return server
class MultiAPIClient:
"""多API客户端类支持负载均衡和并发调用"""
def __init__(self, servers: List[Dict[str, str]], max_workers: int = 5):
self.load_balancer = LoadBalancer(servers)
self.max_workers = max_workers
def call_api_with_inputs(self, inputs: Dict[str, Any] = None, query: str = None) -> Dict[str, Any]:
"""通过负载均衡选择服务器并调用API"""
server = self.load_balancer.get_next_server()
client = APIClient(server['url'], server['auth_token'])
return client.call_api_with_inputs(inputs, query)
def process_batch(self, items: List[str]) -> List[Dict[str, Any]]:
"""并发处理一批数据"""
results = []
start_time = time.time()
logger.info(f"开始并发处理 {len(items)} 个数据项,最大并发数: {self.max_workers}")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_item = {
executor.submit(self._process_single_item, item): item
for item in items
}
# 收集结果
for future in as_completed(future_to_item):
item = future_to_item[future]
try:
result = future.result()
results.append(result)
if result.get('success'):
logger.info(f"完成处理: {item}, 使用服务器: {result.get('server')}, 耗时: {result.get('elapsed_time', 0):.2f}")
else:
logger.error(f"处理失败: {item}, 使用服务器: {result.get('server')}, 错误: {result.get('error')}")
except Exception as e:
logger.error(f"处理 {item} 时发生异常: {e}")
results.append({
"item": item,
"success": False,
"error": str(e)
})
total_elapsed_time = time.time() - start_time
logger.info(f"批量处理完成,总共处理: {len(results)} 项,成功: {sum(1 for r in results if r.get('success', False))} 项,总耗时: {total_elapsed_time:.2f}")
return results
def _process_single_item(self, item: str) -> Dict[str, Any]:
"""处理单个数据项"""
inputs_data = {}
return self.call_api_with_inputs(inputs_data, item)
def main():
"""主函数"""
# 数据库配置
@ -173,13 +276,36 @@ def main():
'schema': 'p70_ai_intelligence'
}
# API配置
api_config = {
'url': 'https://agent.idgvalue.com/v1/chat-messages',
'auth_token': 'Bearer app-yAPAzkPbAaV8l1SuZBjXHTU4'
}
# 多个API服务器配置
api_servers = [
# 可以在这里添加更多服务器
{
'url': 'https://agent1.idgvalue.com/v1/chat-messages',
'auth_token': 'Bearer app-99g3fpcZcbtNHdPIt5RmKbOJ'
},
{
'url': 'https://agent2.idgvalue.com/v1/chat-messages',
'auth_token': 'Bearer app-iU3FP9Vs1oLTXmKY0q8kqRZy'
},
api_client = APIClient(api_config['url'], api_config['auth_token'])
{
'url': 'https://agent3.idgvalue.com/v1/chat-messages',
'auth_token': 'Bearer app-uZtUYnhux8ubLFhum3bpW9Kc'
},
{
'url': 'https://agent.idgvalue.com/v1/chat-messages',
'auth_token': 'Bearer app-yAPAzkPbAaV8l1SuZBjXHTU4'
},
{
'url': 'https://agent1.idgvalue.com/v1/chat-messages',
'auth_token': 'Bearer app-6HIbbN1vV4oerlKAwd99KXh6'
},
{
'url': 'https://agent2.idgvalue.com/v1/chat-messages',
'auth_token': 'Bearer app-WG4dQBxfqpI2r6Zha2csymp7'
},
]
try:
flag = True
@ -194,30 +320,22 @@ def main():
try:
# 从数据库获取URL列表
list = db_manager.get_urls_from_database(custom_query)
list_items = db_manager.get_urls_from_database(custom_query)
if not list:
if not list_items:
logger.warning("未获取到URL")
return
# 遍历每个URL调用API
for i, text in enumerate(list, 1):
logger.info(f"处理第 {i}/{len(list)} 个数据: {text}")
# 创建多API客户端支持负载均衡和并发调用
multi_api_client = MultiAPIClient(api_servers, max_workers=4)
# 方法1: 使用带inputs参数的调用
inputs_data = {}
query = text
# 并发处理所有数据
logger.info(f"开始处理 {len(list_items)} 个数据项")
results = multi_api_client.process_batch(list_items)
result = api_client.call_api_with_inputs(inputs_data,query)
success_count = sum(1 for r in results if r.get('success', False))
logger.info(f"处理完成,成功: {success_count}/{len(results)}")
if result['success']:
logger.info(f"URL {text} 处理成功")
else:
logger.error(f"URL {text} 处理失败: {result.get('error')}")
# 可选:添加延迟避免请求过于频繁
if i < len(text):
time.sleep(1)
except Exception as e:
logger.error(f"程序执行失败: {e}")
@ -225,6 +343,8 @@ def main():
db_manager.disconnect()
else:
# 单个API客户端示例
api_client = APIClient(api_servers[0]['url'], api_servers[0]['auth_token'])
logger.info("调用不带inputs参数的API示例")
result2 = api_client.call_api_without_inputs()