add workflow 客户信息补全【156,dev
This commit is contained in:
parent
333041ab1b
commit
9722349fd7
|
|
@ -5,6 +5,8 @@ from psycopg2.extras import RealDictCursor
|
|||
import time
|
||||
from typing import List, Dict, Any
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import threading
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
|
|
@ -37,6 +39,7 @@ class APIClient:
|
|||
"response_mode": "blocking",
|
||||
"user": "admin"
|
||||
})
|
||||
logger.info(f"调用API,payload: {payload}")
|
||||
|
||||
try:
|
||||
logger.info("调用带inputs参数的API")
|
||||
|
|
@ -44,7 +47,7 @@ class APIClient:
|
|||
self.api_url,
|
||||
headers=self.headers,
|
||||
data=payload,
|
||||
timeout=1200
|
||||
timeout=2400
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
|
@ -63,6 +66,7 @@ class APIClient:
|
|||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||
"""直接调用不带inputs参数的API
|
||||
|
||||
|
|
@ -244,10 +248,48 @@ class DatabaseManager:
|
|||
except Exception as e:
|
||||
logger.error(f"查询数据库失败: {e}")
|
||||
return []
|
||||
|
||||
def process_single_item(api_client, item_text, type, item_index, total_count):
|
||||
"""处理单个项目"""
|
||||
logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item_text}")
|
||||
|
||||
try:
|
||||
if type == 'workflow':
|
||||
# 方法1: 使用带inputs参数的调用
|
||||
inputs_data = {
|
||||
'companys': f"{item_text}"
|
||||
}
|
||||
|
||||
result = api_client.call_api_with_inputs(inputs_data)
|
||||
|
||||
if result['success']:
|
||||
logger.info(f"数据 {item_text} 处理成功")
|
||||
else:
|
||||
logger.error(f"数据 {item_text} 处理失败: {result.get('error')}")
|
||||
return result
|
||||
|
||||
else:
|
||||
#agent
|
||||
query = item_text
|
||||
result = api_client.call_api_with_query(query)
|
||||
|
||||
if result['success']:
|
||||
logger.info(f"数据 {item_text} 处理成功")
|
||||
else:
|
||||
logger.error(f"数据 {item_text} 处理失败: {result.get('error')}")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理数据 {item_text} 时发生异常: {e}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
|
||||
#DATABASE_SCHEMA=p70_ai_intelligence
|
||||
def main():
|
||||
"""主函数"""
|
||||
# 配置并发数
|
||||
MAX_WORKERS = 10 # 可调整为5或10
|
||||
|
||||
# 数据库配置
|
||||
db_config = {
|
||||
'host': '124.221.232.219',
|
||||
|
|
@ -260,8 +302,8 @@ def main():
|
|||
|
||||
# API配置
|
||||
api_config = {
|
||||
'url': 'https://tk-agent.idgvalue.com/v1/workflows/run',
|
||||
'auth_token': 'Bearer app-pv9rLM4ukTRayP2YD60cS4k5'
|
||||
'url': 'https://agent.idgvalue.com/v1/workflows/run',
|
||||
'auth_token': 'Bearer app-AXKHfJwq6SdZrRMmULh8xONU'
|
||||
}
|
||||
|
||||
api_client = APIClient(api_config['url'], api_config['auth_token'])
|
||||
|
|
@ -275,7 +317,31 @@ def main():
|
|||
if flag:
|
||||
# 初始化
|
||||
db_manager = DatabaseManager(db_config)
|
||||
custom_query = """ ${custom_query} """
|
||||
custom_query = """
|
||||
WITH numbered_names AS (
|
||||
SELECT
|
||||
name,
|
||||
(ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /10 as batch_num
|
||||
FROM p30_common.v_sql_cleaned_cn_d_account_info
|
||||
WHERE name NOT IN (
|
||||
SELECT "search"
|
||||
FROM p70_ai_intelligence.agent_account_info
|
||||
WHERE "search" IS NOT NULL
|
||||
)
|
||||
and name NOT IN (
|
||||
SELECT name
|
||||
FROM p70_ai_intelligence.agent_execp_account
|
||||
WHERE name IS NOT NULL
|
||||
)
|
||||
limit 100
|
||||
)
|
||||
SELECT
|
||||
STRING_AGG(name, E'\n') as batched_names
|
||||
FROM numbered_names
|
||||
GROUP BY batch_num
|
||||
ORDER BY batch_num;
|
||||
|
||||
"""
|
||||
|
||||
try:
|
||||
# 从数据库获取URL列表
|
||||
|
|
@ -285,37 +351,39 @@ def main():
|
|||
logger.warning("未获取到URL")
|
||||
return
|
||||
|
||||
# 遍历每个URL调用API
|
||||
for i, text in enumerate(list, 1):
|
||||
logger.info(f"处理第 {i}/{len(list)} 个数据: {text}")
|
||||
# 展平所有数据项为单个列表
|
||||
# all_items = []
|
||||
# for batch in list:
|
||||
# items = batch.split('\n')
|
||||
# all_items.extend(items)
|
||||
|
||||
logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程")
|
||||
|
||||
# 使用线程池并发处理
|
||||
success_count = 0
|
||||
failed_count = 0
|
||||
|
||||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||||
# 提交所有任务
|
||||
future_to_item = {
|
||||
executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item
|
||||
for i, item in enumerate(list)
|
||||
}
|
||||
|
||||
if type is 'workflow':
|
||||
# 方法1: 使用带inputs参数的调用
|
||||
inputs_data = {
|
||||
"urls": f"{text}"
|
||||
}
|
||||
|
||||
result = api_client.call_api_with_inputs(inputs_data)
|
||||
|
||||
if result['success']:
|
||||
logger.info(f"URL {text} 处理成功")
|
||||
else:
|
||||
logger.error(f"URL {text} 处理失败: {result.get('error')}")
|
||||
|
||||
|
||||
else:
|
||||
#agent
|
||||
query = text
|
||||
result = api_client.call_api_with_query(query)
|
||||
|
||||
if result['success']:
|
||||
logger.info(f"URL {text} 处理成功")
|
||||
else:
|
||||
logger.error(f"URL {text} 处理失败: {result.get('error')}")
|
||||
|
||||
# 可选:添加延迟避免请求过于频繁
|
||||
if i < len(text):
|
||||
time.sleep(1)
|
||||
# 收集结果
|
||||
for future in as_completed(future_to_item):
|
||||
item = future_to_item[future]
|
||||
try:
|
||||
result = future.result()
|
||||
if result.get("success"):
|
||||
success_count += 1
|
||||
else:
|
||||
failed_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"处理项目 {item} 时发生异常: {e}")
|
||||
failed_count += 1
|
||||
|
||||
logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"程序执行失败: {e}")
|
||||
|
|
@ -325,7 +393,7 @@ def main():
|
|||
|
||||
else:
|
||||
logger.info("调用不带inputs参数的API示例")
|
||||
if type is 'workflow':
|
||||
if type == 'workflow':
|
||||
result2 = api_client.call_api_without_inputs()
|
||||
|
||||
if result2['success']:
|
||||
|
|
|
|||
Loading…
Reference in New Issue