add workflow Append-客户信息补全,dev

This commit is contained in:
root 2025-12-08 16:45:50 +08:00
parent 5c7be890d8
commit aea9d4c40e
1 changed files with 47 additions and 107 deletions

View File

@ -5,8 +5,6 @@ from psycopg2.extras import RealDictCursor
import time
from typing import List, Dict, Any
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# 配置日志
logging.basicConfig(
@ -34,41 +32,30 @@ class APIClient:
Returns:
API响应数据
"""
payload = {
payload = json.dumps({
"inputs": inputs,
"response_mode": "blocking",
"user": "admin"
}
logger.info(f"调用APIpayload: {json.dumps(payload, ensure_ascii=False)}")
})
try:
logger.info("调用带inputs参数的API")
response = requests.post(
self.api_url,
headers=self.headers,
json=payload, # Using json parameter instead of data to let requests handle serialization
timeout=300 # Reduced timeout to more reasonable value
data=payload,
timeout=1200
)
logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}")
return {
"success": True,
"status_code": response.status_code,
"data": response.json() if response.content else {}
"data": response.json()
}
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return {
"success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code
}
except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}")
return {
@ -76,7 +63,6 @@ class APIClient:
"error": str(e)
}
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
"""直接调用不带inputs参数的API
@ -102,7 +88,7 @@ class APIClient:
self.api_url,
headers=self.headers,
data=json.dumps(payload),
timeout=300
timeout=1200
)
response.raise_for_status()
@ -144,7 +130,7 @@ class APIClient:
self.api_url,
headers=self.headers,
data=payload,
timeout=300
timeout=2400
)
response.raise_for_status()
@ -237,8 +223,8 @@ class DatabaseManager:
self.connection.close()
logger.info("数据库连接已关闭")
def get_urls_from_database(self, query: str = None) -> List[Dict[str, Any]]:
"""从数据库获取数据列表"""
def get_urls_from_database(self, query: str = None) -> List[str]:
"""从数据库获取URL列表"""
if not self.connection:
self.connect()
@ -251,60 +237,17 @@ class DatabaseManager:
cursor.execute(query)
results = cursor.fetchall()
# 直接返回字典列表,每个字典包含 id 和 address
return [dict(row) for row in results]
# 处理结果,返回字符串列表
datas = [list(row.values())[0] for row in results]
return datas
except Exception as e:
logger.error(f"查询数据库失败: {e}")
return []
def process_single_item(api_client, item, type, item_index, total_count):
"""处理单个项目"""
logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item}")
try:
if type == 'workflow':
# item 是字典,包含 id 和 address
inputs_data = {
'company1': f"{item.get('company1', '')}",
'address1': f"{item.get('address1', '')}",
'city1':f"{item.get('city1', '')}",
'company2': f"{item.get('company2', '')}",
'address2': f"{item.get('address2', '')}",
'city2':f"{item.get('city2', '')}"
}
result = api_client.call_api_with_inputs(inputs_data)
if result['success']:
logger.info(f"数据 {item} 处理成功")
else:
logger.error(f"数据 {item} 处理失败: {result.get('error')}")
return result
else:
#agent
# 将字典转换为字符串查询
query = f"ID: {item.get('id', '')}, 地址: {item.get('address', '')}"
result = api_client.call_api_with_query(query)
if result['success']:
logger.info(f"数据 {item} 处理成功")
else:
logger.error(f"数据 {item} 处理失败: {result.get('error')}")
return result
except Exception as e:
logger.error(f"处理数据 {item} 时发生异常: {e}")
return {"success": False, "error": str(e)}
#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
#DATABASE_SCHEMA=p70_ai_intelligence
def main():
"""主函数"""
# 配置并发数
MAX_WORKERS = 10 # 可调整为5或10
# 数据库配置
db_config = {
'host': '124.221.232.219',
@ -332,8 +275,7 @@ def main():
if flag:
# 初始化
db_manager = DatabaseManager(db_config)
custom_query = """
SELECT
custom_query = """ SELECT
t2.name as company1,
t2.city_name as city1,
t2.address_detail as address1,
@ -345,11 +287,11 @@ t1.address as address2
FROM p70_ai_intelligence.agent_account_info t1
left join p30_common.v_sql_cleaned_cn_d_account_info t2
on t2."name" = t1."search"
where t1.city <> t2.city_name
where ((t1.city not like t2.city_name||'%' and t2.city_name <>'') or t1.account not like t2.name||'%')
and t2.name not in (select search from p70_ai_intelligence.agent_account_info_append)
limit 100
"""
"""
try:
# 从数据库获取URL列表
@ -359,39 +301,37 @@ t1.address as address2
logger.warning("未获取到URL")
return
# 展平所有数据项为单个列表
# all_items = []
# for batch in list:
# items = batch.split('\n')
# all_items.extend(items)
logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程")
# 使用线程池并发处理
success_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交所有任务
future_to_item = {
executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item
for i, item in enumerate(list)
}
# 遍历每个URL调用API
for i, text in enumerate(list, 1):
logger.info(f"处理第 {i}/{len(list)} 个数据: {text}")
# 收集结果
for future in as_completed(future_to_item):
item = future_to_item[future]
try:
result = future.result()
if result.get("success"):
success_count += 1
else:
failed_count += 1
except Exception as e:
logger.error(f"处理项目 {item} 时发生异常: {e}")
failed_count += 1
logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}")
if type is 'workflow':
# 方法1: 使用带inputs参数的调用
inputs_data = {
"urls": f"{text}"
}
result = api_client.call_api_with_inputs(inputs_data)
if result['success']:
logger.info(f"URL {text} 处理成功")
else:
logger.error(f"URL {text} 处理失败: {result.get('error')}")
else:
#agent
query = text
result = api_client.call_api_with_query(query)
if result['success']:
logger.info(f"URL {text} 处理成功")
else:
logger.error(f"URL {text} 处理失败: {result.get('error')}")
# 可选:添加延迟避免请求过于频繁
if i < len(text):
time.sleep(1)
except Exception as e:
logger.error(f"程序执行失败: {e}")
@ -401,7 +341,7 @@ t1.address as address2
else:
logger.info("调用不带inputs参数的API示例")
if type == 'workflow':
if type is 'workflow':
result2 = api_client.call_api_without_inputs()
if result2['success']: