add workflow 客户信息补全【156,dev

This commit is contained in:
root 2025-12-08 11:08:39 +08:00
parent 3e0792e49d
commit 4778c8050c
4 changed files with 1153 additions and 891 deletions

View File

@ -5,6 +5,8 @@ from psycopg2.extras import RealDictCursor
import time import time
from typing import List, Dict, Any from typing import List, Dict, Any
import logging import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# 配置日志 # 配置日志
logging.basicConfig( logging.basicConfig(
@ -32,30 +34,41 @@ class APIClient:
Returns: Returns:
API响应数据 API响应数据
""" """
payload = json.dumps({ payload = {
"inputs": inputs, "inputs": inputs,
"response_mode": "blocking", "response_mode": "blocking",
"user": "admin" "user": "admin"
}) }
logger.info(f"调用APIpayload: {json.dumps(payload, ensure_ascii=False)}")
try: try:
logger.info("调用带inputs参数的API") logger.info("调用带inputs参数的API")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
data=payload, json=payload, # Using json parameter instead of data to let requests handle serialization
timeout=1200 timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return {
"success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code
}
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
@ -63,6 +76,7 @@ class APIClient:
"error": str(e) "error": str(e)
} }
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
"""直接调用不带inputs参数的API """直接调用不带inputs参数的API
@ -83,23 +97,33 @@ class APIClient:
payload.update(other_params) payload.update(other_params)
try: try:
logger.info("调用不带inputs参数的API") logger.info(f"调用不带inputs参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
data=json.dumps(payload), json=payload, # Using json parameter instead of data
timeout=1200 timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return {
"success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code
}
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
@ -117,31 +141,41 @@ class APIClient:
Returns: Returns:
API响应数据 API响应数据
""" """
payload = json.dumps({ payload = {
"inputs": {}, "inputs": {},
"query" : query, "query" : query,
"response_mode": "streaming", "response_mode": "blocking", # Changed from streaming to blocking for consistency
"user": "admin" "user": "admin"
}) }
try: try:
logger.info("调用带inputs参数的API") logger.info(f"调用带query参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
data=payload, json=payload, # Using json parameter instead of data
timeout=2400 timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return {
"success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code
}
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
@ -161,7 +195,7 @@ class APIClient:
payload = { payload = {
"inputs": {}, "inputs": {},
"query":"", "query":"",
"response_mode": "streaming", "response_mode": "blocking", # Changed from streaming to blocking for consistency
"user": "admin" "user": "admin"
} }
@ -170,23 +204,33 @@ class APIClient:
payload.update(other_params) payload.update(other_params)
try: try:
logger.info("调用不带inputs参数的API") logger.info(f"调用不带query参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
data=json.dumps(payload), json=payload, # Using json parameter instead of data
timeout=2400 timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return {
"success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code
}
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
@ -244,10 +288,49 @@ class DatabaseManager:
except Exception as e: except Exception as e:
logger.error(f"查询数据库失败: {e}") logger.error(f"查询数据库失败: {e}")
return [] return []
def process_single_item(api_client, item_text, type, item_index, total_count):
"""处理单个项目"""
logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item_text}")
try:
if type == 'workflow':
# 方法1: 使用带inputs参数的调用
inputs_data = {
'companys': f"{item_text}"
}
print(inputs_data)
result = api_client.call_api_with_inputs(inputs_data)
if result['success']:
logger.info(f"数据 {item_text} 处理成功")
else:
logger.error(f"数据 {item_text} 处理失败: {result.get('error')}")
return result
else:
#agent
query = item_text
result = api_client.call_api_with_query(query)
if result['success']:
logger.info(f"数据 {item_text} 处理成功")
else:
logger.error(f"数据 {item_text} 处理失败: {result.get('error')}")
return result
except Exception as e:
logger.error(f"处理数据 {item_text} 时发生异常: {e}")
return {"success": False, "error": str(e)}
#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp #DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
#DATABASE_SCHEMA=p70_ai_intelligence #DATABASE_SCHEMA=p70_ai_intelligence
def main(): def main():
"""主函数""" """主函数"""
# 配置并发数
MAX_WORKERS = 3 # 可调整为5或10
# 数据库配置 # 数据库配置
db_config = { db_config = {
'host': '124.221.232.219', 'host': '124.221.232.219',
@ -260,13 +343,12 @@ def main():
# API配置 # API配置
api_config = { api_config = {
'url': 'https://tk-agent.idgvalue.com/v1/workflows/run', 'url': 'http://10.168.1.153:18000/v1/workflows/run',
'auth_token': 'Bearer app-pv9rLM4ukTRayP2YD60cS4k5' 'auth_token': 'Bearer app-JYQCHu09hlZn0b0OUVW3PRdr'
} }
api_client = APIClient(api_config['url'], api_config['auth_token']) api_client = APIClient(api_config['url'], api_config['auth_token'])
type = 'agent'
type = 'workflow' type = 'workflow'
try: try:
@ -275,7 +357,33 @@ def main():
if flag: if flag:
# 初始化 # 初始化
db_manager = DatabaseManager(db_config) db_manager = DatabaseManager(db_config)
custom_query = """ ${custom_query} """ custom_query = """
WITH numbered_names AS (
SELECT
name,
(ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /5 as batch_num
FROM p30_common.v_sql_cleaned_cn_d_account_info
WHERE name NOT IN (
SELECT "search"
FROM p70_ai_intelligence.agent_account_info
WHERE "search" IS NOT NULL
)
and name NOT IN (
SELECT name
FROM p70_ai_intelligence.agent_execp_account
WHERE name IS NOT NULL
)
order by dw_account
offset 2250
limit 4
)
SELECT
STRING_AGG(name, E'\n') as batched_names
FROM numbered_names
GROUP BY batch_num
ORDER BY batch_num;
"""
try: try:
# 从数据库获取URL列表 # 从数据库获取URL列表
@ -285,37 +393,39 @@ def main():
logger.warning("未获取到URL") logger.warning("未获取到URL")
return return
# 遍历每个URL调用API # 展平所有数据项为单个列表
for i, text in enumerate(list, 1): # all_items = []
logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") # for batch in list:
# items = batch.split('\n')
# all_items.extend(items)
logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程")
# 使用线程池并发处理
success_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交所有任务
future_to_item = {
executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item
for i, item in enumerate(list)
}
if type is 'workflow': # 收集结果
# 方法1: 使用带inputs参数的调用 for future in as_completed(future_to_item):
inputs_data = { item = future_to_item[future]
"urls": f"{text}" try:
} result = future.result()
if result.get("success"):
result = api_client.call_api_with_inputs(inputs_data) success_count += 1
else:
if result['success']: failed_count += 1
logger.info(f"URL {text} 处理成功") except Exception as e:
else: logger.error(f"处理项目 {item} 时发生异常: {e}")
logger.error(f"URL {text} 处理失败: {result.get('error')}") failed_count += 1
logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}")
else:
#agent
query = text
result = api_client.call_api_with_query(query)
if result['success']:
logger.info(f"URL {text} 处理成功")
else:
logger.error(f"URL {text} 处理失败: {result.get('error')}")
# 可选:添加延迟避免请求过于频繁
if i < len(text):
time.sleep(1)
except Exception as e: except Exception as e:
logger.error(f"程序执行失败: {e}") logger.error(f"程序执行失败: {e}")
@ -325,7 +435,7 @@ def main():
else: else:
logger.info("调用不带inputs参数的API示例") logger.info("调用不带inputs参数的API示例")
if type is 'workflow': if type == 'workflow':
result2 = api_client.call_api_without_inputs() result2 = api_client.call_api_without_inputs()
if result2['success']: if result2['success']:

View File

@ -34,31 +34,41 @@ class APIClient:
Returns: Returns:
API响应数据 API响应数据
""" """
payload = json.dumps({ payload = {
"inputs": inputs, "inputs": inputs,
"response_mode": "blocking", "response_mode": "blocking",
"user": "admin" "user": "admin"
}) }
logger.info(f"调用APIpayload: {payload}") logger.info(f"调用APIpayload: {json.dumps(payload, ensure_ascii=False)}")
try: try:
logger.info("调用带inputs参数的API") logger.info("调用带inputs参数的API")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
data=payload, json=payload, # Using json parameter instead of data to let requests handle serialization
timeout=2400 timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return {
"success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code
}
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
@ -87,23 +97,33 @@ class APIClient:
payload.update(other_params) payload.update(other_params)
try: try:
logger.info("调用不带inputs参数的API") logger.info(f"调用不带inputs参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
data=json.dumps(payload), json=payload, # Using json parameter instead of data
timeout=1200 timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return {
"success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code
}
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
@ -121,31 +141,41 @@ class APIClient:
Returns: Returns:
API响应数据 API响应数据
""" """
payload = json.dumps({ payload = {
"inputs": {}, "inputs": {},
"query" : query, "query" : query,
"response_mode": "streaming", "response_mode": "blocking", # Changed from streaming to blocking for consistency
"user": "admin" "user": "admin"
}) }
try: try:
logger.info("调用带inputs参数的API") logger.info(f"调用带query参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
data=payload, json=payload, # Using json parameter instead of data
timeout=2400 timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return {
"success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code
}
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
@ -165,7 +195,7 @@ class APIClient:
payload = { payload = {
"inputs": {}, "inputs": {},
"query":"", "query":"",
"response_mode": "streaming", "response_mode": "blocking", # Changed from streaming to blocking for consistency
"user": "admin" "user": "admin"
} }
@ -174,23 +204,33 @@ class APIClient:
payload.update(other_params) payload.update(other_params)
try: try:
logger.info("调用不带inputs参数的API") logger.info(f"调用不带query参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
data=json.dumps(payload), json=payload, # Using json parameter instead of data
timeout=2400 timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return {
"success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code
}
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
@ -259,6 +299,7 @@ def process_single_item(api_client, item_text, type, item_index, total_count):
inputs_data = { inputs_data = {
'companys': f"{item_text}" 'companys': f"{item_text}"
} }
print(inputs_data)
result = api_client.call_api_with_inputs(inputs_data) result = api_client.call_api_with_inputs(inputs_data)
@ -288,7 +329,7 @@ def process_single_item(api_client, item_text, type, item_index, total_count):
def main(): def main():
"""主函数""" """主函数"""
# 配置并发数 # 配置并发数
MAX_WORKERS = 10 # 可调整为5或10 MAX_WORKERS = 3 # 可调整为5或10
# 数据库配置 # 数据库配置
db_config = { db_config = {
@ -308,7 +349,6 @@ def main():
api_client = APIClient(api_config['url'], api_config['auth_token']) api_client = APIClient(api_config['url'], api_config['auth_token'])
type = 'agent'
type = 'workflow' type = 'workflow'
try: try:
@ -321,7 +361,7 @@ def main():
WITH numbered_names AS ( WITH numbered_names AS (
SELECT SELECT
name, name,
(ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /10 as batch_num (ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /5 as batch_num
FROM p30_common.v_sql_cleaned_cn_d_account_info FROM p30_common.v_sql_cleaned_cn_d_account_info
WHERE name NOT IN ( WHERE name NOT IN (
SELECT "search" SELECT "search"
@ -333,7 +373,9 @@ WITH numbered_names AS (
FROM p70_ai_intelligence.agent_execp_account FROM p70_ai_intelligence.agent_execp_account
WHERE name IS NOT NULL WHERE name IS NOT NULL
) )
limit 100 order by dw_account
offset 750
limit 5
) )
SELECT SELECT
STRING_AGG(name, E'\n') as batched_names STRING_AGG(name, E'\n') as batched_names

View File

@ -1,351 +1,461 @@
import requests import requests
import json import json
import psycopg2 import psycopg2
from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
import time import time
from typing import List, Dict, Any from typing import List, Dict, Any
import logging import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
# 配置日志 import threading
logging.basicConfig(
level=logging.INFO, # 配置日志
format='%(asctime)s - %(levelname)s - %(message)s' logging.basicConfig(
) level=logging.INFO,
logger = logging.getLogger(__name__) format='%(asctime)s - %(levelname)s - %(message)s'
)
class APIClient: logger = logging.getLogger(__name__)
"""简化的API客户端类"""
class APIClient:
def __init__(self, api_url: str, auth_token: str): """简化的API客户端类"""
self.api_url = api_url
self.headers = { def __init__(self, api_url: str, auth_token: str):
'Authorization': auth_token, self.api_url = api_url
'Content-Type': 'application/json' self.headers = {
} 'Authorization': auth_token,
'Content-Type': 'application/json'
def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]: }
"""直接调用带inputs参数的API
def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
Args: """直接调用带inputs参数的API
inputs: inputs参数字典
Args:
Returns: inputs: inputs参数字典
API响应数据
""" Returns:
payload = json.dumps({ API响应数据
"inputs": inputs, """
"response_mode": "blocking", payload = {
"user": "admin" "inputs": inputs,
}) "response_mode": "blocking",
"user": "admin"
try: }
logger.info("调用带inputs参数的API") logger.info(f"调用APIpayload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post(
self.api_url, try:
headers=self.headers, logger.info("调用带inputs参数的API")
data=payload, response = requests.post(
timeout=1200 self.api_url,
) headers=self.headers,
json=payload, # Using json parameter instead of data to let requests handle serialization
response.raise_for_status() timeout=300 # Reduced timeout to more reasonable value
logger.info(f"API调用成功状态码: {response.status_code}") )
return { logger.info(f"API调用完成状态码: {response.status_code}")
"success": True, logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
"status_code": response.status_code,
"data": response.json() response.raise_for_status()
} logger.info(f"API调用成功状态码: {response.status_code}")
except requests.exceptions.RequestException as e: return {
logger.error(f"API调用失败: {e}") "success": True,
return { "status_code": response.status_code,
"success": False, "data": response.json() if response.content else {}
"error": str(e) }
}
except requests.exceptions.HTTPError as e:
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
"""直接调用不带inputs参数的API return {
"success": False,
Args: "error": f"HTTP {e.response.status_code}: {e.response.text}",
other_params: 其他参数字典 "status_code": e.response.status_code
}
Returns: except requests.exceptions.RequestException as e:
API响应数据 logger.error(f"API调用失败: {e}")
""" return {
payload = { "success": False,
"inputs": {}, "error": str(e)
"response_mode": "blocking", }
"user": "admin"
}
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
# 添加其他参数 """直接调用不带inputs参数的API
if other_params:
payload.update(other_params) Args:
other_params: 其他参数字典
try:
logger.info("调用不带inputs参数的API") Returns:
response = requests.post( API响应数据
self.api_url, """
headers=self.headers, payload = {
data=json.dumps(payload), "inputs": {},
timeout=1200 "response_mode": "blocking",
) "user": "admin"
}
response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") # 添加其他参数
if other_params:
return { payload.update(other_params)
"success": True,
"status_code": response.status_code, try:
"data": response.json() logger.info(f"调用不带inputs参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
} response = requests.post(
self.api_url,
except requests.exceptions.RequestException as e: headers=self.headers,
logger.error(f"API调用失败: {e}") json=payload, # Using json parameter instead of data
return { timeout=300 # Reduced timeout to more reasonable value
"success": False, )
"error": str(e)
} logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
def call_api_with_query(self, query: str = None) -> Dict[str, Any]: response.raise_for_status()
"""直接调用带inputs参数的API logger.info(f"API调用成功状态码: {response.status_code}")
Args: return {
query: 查询语句 "success": True,
"status_code": response.status_code,
Returns: "data": response.json() if response.content else {}
API响应数据 }
"""
payload = json.dumps({ except requests.exceptions.HTTPError as e:
"inputs": {}, logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
"query" : query, return {
"response_mode": "streaming", "success": False,
"user": "admin" "error": f"HTTP {e.response.status_code}: {e.response.text}",
}) "status_code": e.response.status_code
}
try: except requests.exceptions.RequestException as e:
logger.info("调用带inputs参数的API") logger.error(f"API调用失败: {e}")
response = requests.post( return {
self.api_url, "success": False,
headers=self.headers, "error": str(e)
data=payload, }
timeout=2400
)
def call_api_with_query(self, query: str = None) -> Dict[str, Any]:
response.raise_for_status() """直接调用带inputs参数的API
logger.info(f"API调用成功状态码: {response.status_code}")
Args:
return { query: 查询语句
"success": True,
"status_code": response.status_code, Returns:
"data": response.json() API响应数据
} """
payload = {
except requests.exceptions.RequestException as e: "inputs": {},
logger.error(f"API调用失败: {e}") "query" : query,
return { "response_mode": "blocking", # Changed from streaming to blocking for consistency
"success": False, "user": "admin"
"error": str(e) }
}
try:
def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: logger.info(f"调用带query参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
"""直接调用不带inputs参数的API response = requests.post(
self.api_url,
Args: headers=self.headers,
other_params: 其他参数字典 json=payload, # Using json parameter instead of data
timeout=300 # Reduced timeout to more reasonable value
Returns: )
API响应数据
""" logger.info(f"API调用完成状态码: {response.status_code}")
payload = { logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
"inputs": {},
"query":"", response.raise_for_status()
"response_mode": "streaming", logger.info(f"API调用成功状态码: {response.status_code}")
"user": "admin"
} return {
"success": True,
# 添加其他参数 "status_code": response.status_code,
if other_params: "data": response.json() if response.content else {}
payload.update(other_params) }
try: except requests.exceptions.HTTPError as e:
logger.info("调用不带inputs参数的API") logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
response = requests.post( return {
self.api_url, "success": False,
headers=self.headers, "error": f"HTTP {e.response.status_code}: {e.response.text}",
data=json.dumps(payload), "status_code": e.response.status_code
timeout=2400 }
) except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}")
response.raise_for_status() return {
logger.info(f"API调用成功状态码: {response.status_code}") "success": False,
"error": str(e)
return { }
"success": True,
"status_code": response.status_code, def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
"data": response.json() """直接调用不带inputs参数的API
}
Args:
except requests.exceptions.RequestException as e: other_params: 其他参数字典
logger.error(f"API调用失败: {e}")
return { Returns:
"success": False, API响应数据
"error": str(e) """
} payload = {
"inputs": {},
class DatabaseManager: "query":"",
"""数据库管理类""" "response_mode": "blocking", # Changed from streaming to blocking for consistency
"user": "admin"
def __init__(self, db_config: Dict[str, str]): }
self.db_config = db_config
self.connection = None # 添加其他参数
if other_params:
def connect(self): payload.update(other_params)
"""连接数据库"""
try: try:
self.connection = psycopg2.connect( logger.info(f"调用不带query参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
host=self.db_config.get('host', 'localhost'), response = requests.post(
port=self.db_config.get('port', '5432'), self.api_url,
database=self.db_config.get('database', 'postgres'), headers=self.headers,
user=self.db_config.get('user', 'postgres'), json=payload, # Using json parameter instead of data
password=self.db_config.get('password', ''), timeout=300 # Reduced timeout to more reasonable value
cursor_factory=RealDictCursor )
)
logger.info("数据库连接成功") logger.info(f"API调用完成状态码: {response.status_code}")
except Exception as e: logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
logger.error(f"数据库连接失败: {e}")
raise response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}")
def disconnect(self):
"""断开数据库连接""" return {
if self.connection: "success": True,
self.connection.close() "status_code": response.status_code,
logger.info("数据库连接已关闭") "data": response.json() if response.content else {}
}
def get_urls_from_database(self, query: str = None) -> List[str]:
"""从数据库获取URL列表""" except requests.exceptions.HTTPError as e:
if not self.connection: logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
self.connect() return {
"success": False,
# 默认查询语句,根据实际情况修改 "error": f"HTTP {e.response.status_code}: {e.response.text}",
if query is None: "status_code": e.response.status_code
query = "SELECT 1" }
except requests.exceptions.RequestException as e:
try: logger.error(f"API调用失败: {e}")
with self.connection.cursor() as cursor: return {
cursor.execute(query) "success": False,
results = cursor.fetchall() "error": str(e)
}
# 处理结果,返回字符串列表
datas = [list(row.values())[0] for row in results] class DatabaseManager:
return datas """数据库管理类"""
except Exception as e: def __init__(self, db_config: Dict[str, str]):
logger.error(f"查询数据库失败: {e}") self.db_config = db_config
return [] self.connection = None
#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
#DATABASE_SCHEMA=p70_ai_intelligence def connect(self):
def main(): """连接数据库"""
"""主函数""" try:
# 数据库配置 self.connection = psycopg2.connect(
db_config = { host=self.db_config.get('host', 'localhost'),
'host': '124.221.232.219', port=self.db_config.get('port', '5432'),
'port': '5432', database=self.db_config.get('database', 'postgres'),
'database': 'daas_mpp', user=self.db_config.get('user', 'postgres'),
'user': 'dbuser_dba', password=self.db_config.get('password', ''),
'password': 'EmBRxnmmjnE3', cursor_factory=RealDictCursor
'schema': 'p70_ai_intelligence' )
} logger.info("数据库连接成功")
except Exception as e:
# API配置 logger.error(f"数据库连接失败: {e}")
api_config = { raise
'url': 'https://tk-agent.idgvalue.com/v1/workflows/run',
'auth_token': 'Bearer app-pv9rLM4ukTRayP2YD60cS4k5' def disconnect(self):
} """断开数据库连接"""
if self.connection:
api_client = APIClient(api_config['url'], api_config['auth_token']) self.connection.close()
logger.info("数据库连接已关闭")
type = 'agent'
type = 'workflow' def get_urls_from_database(self, query: str = None) -> List[str]:
"""从数据库获取URL列表"""
try: if not self.connection:
flag = True self.connect()
if flag: # 默认查询语句,根据实际情况修改
# 初始化 if query is None:
db_manager = DatabaseManager(db_config) query = "SELECT 1"
custom_query = """ ${custom_query} """
try:
try: with self.connection.cursor() as cursor:
# 从数据库获取URL列表 cursor.execute(query)
list = db_manager.get_urls_from_database(custom_query) results = cursor.fetchall()
if not list: # 处理结果,返回字符串列表
logger.warning("未获取到URL") datas = [list(row.values())[0] for row in results]
return return datas
# 遍历每个URL调用API except Exception as e:
for i, text in enumerate(list, 1): logger.error(f"查询数据库失败: {e}")
logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") return []
if type is 'workflow': def process_single_item(api_client, item_text, type, item_index, total_count):
# 方法1: 使用带inputs参数的调用 """处理单个项目"""
inputs_data = { logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item_text}")
"urls": f"{text}"
} try:
if type == 'workflow':
result = api_client.call_api_with_inputs(inputs_data) # 方法1: 使用带inputs参数的调用
inputs_data = {
if result['success']: 'companys': f"{item_text}"
logger.info(f"URL {text} 处理成功") }
else: print(inputs_data)
logger.error(f"URL {text} 处理失败: {result.get('error')}")
result = api_client.call_api_with_inputs(inputs_data)
else: if result['success']:
#agent logger.info(f"数据 {item_text} 处理成功")
query = text else:
result = api_client.call_api_with_query(query) logger.error(f"数据 {item_text} 处理失败: {result.get('error')}")
return result
if result['success']:
logger.info(f"URL {text} 处理成功") else:
else: #agent
logger.error(f"URL {text} 处理失败: {result.get('error')}") query = item_text
result = api_client.call_api_with_query(query)
# 可选:添加延迟避免请求过于频繁
if i < len(text): if result['success']:
time.sleep(1) logger.info(f"数据 {item_text} 处理成功")
else:
except Exception as e: logger.error(f"数据 {item_text} 处理失败: {result.get('error')}")
logger.error(f"程序执行失败: {e}") return result
finally: except Exception as e:
db_manager.disconnect() logger.error(f"处理数据 {item_text} 时发生异常: {e}")
return {"success": False, "error": str(e)}
else:
logger.info("调用不带inputs参数的API示例") #DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
if type is 'workflow': #DATABASE_SCHEMA=p70_ai_intelligence
result2 = api_client.call_api_without_inputs() def main():
"""主函数"""
if result2['success']: # 配置并发数
logger.info("不带inputs参数的API调用成功") MAX_WORKERS = 3 # 可调整为5或10
else:
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") # 数据库配置
db_config = {
else: 'host': '124.221.232.219',
#agent 'port': '5432',
result2 = api_client.call_api_without_query() 'database': 'daas_mpp',
'user': 'dbuser_dba',
if result2['success']: 'password': 'EmBRxnmmjnE3',
logger.info("不带inputs参数的API调用成功") 'schema': 'p70_ai_intelligence'
else: }
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
# API配置
except Exception as e: api_config = {
logger.error(f"初始化失败: {e}") 'url': 'http://10.168.1.162:18000/v1/workflows/run',
'auth_token': 'Bearer app-C0iPJse2Iutj7D6sngAv2eUv'
}
if __name__ == "__main__": api_client = APIClient(api_config['url'], api_config['auth_token'])
type = 'workflow'
try:
flag = True
if flag:
# 初始化
db_manager = DatabaseManager(db_config)
custom_query = """
WITH numbered_names AS (
SELECT
name,
(ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /5 as batch_num
FROM p30_common.v_sql_cleaned_cn_d_account_info
WHERE name NOT IN (
SELECT "search"
FROM p70_ai_intelligence.agent_account_info
WHERE "search" IS NOT NULL
)
and name NOT IN (
SELECT name
FROM p70_ai_intelligence.agent_execp_account
WHERE name IS NOT NULL
)
order by dw_account
offset 1500
limit 5
)
SELECT
STRING_AGG(name, E'\n') as batched_names
FROM numbered_names
GROUP BY batch_num
ORDER BY batch_num;
"""
try:
# 从数据库获取URL列表
list = db_manager.get_urls_from_database(custom_query)
if not list:
logger.warning("未获取到URL")
return
# 展平所有数据项为单个列表
# all_items = []
# for batch in list:
# items = batch.split('\n')
# all_items.extend(items)
logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程")
# 使用线程池并发处理
success_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交所有任务
future_to_item = {
executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item
for i, item in enumerate(list)
}
# 收集结果
for future in as_completed(future_to_item):
item = future_to_item[future]
try:
result = future.result()
if result.get("success"):
success_count += 1
else:
failed_count += 1
except Exception as e:
logger.error(f"处理项目 {item} 时发生异常: {e}")
failed_count += 1
logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}")
except Exception as e:
logger.error(f"程序执行失败: {e}")
finally:
db_manager.disconnect()
else:
logger.info("调用不带inputs参数的API示例")
if type == 'workflow':
result2 = api_client.call_api_without_inputs()
if result2['success']:
logger.info("不带inputs参数的API调用成功")
else:
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
else:
#agent
result2 = api_client.call_api_without_query()
if result2['success']:
logger.info("不带inputs参数的API调用成功")
else:
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
except Exception as e:
logger.error(f"初始化失败: {e}")
if __name__ == "__main__":
main() main()

View File

@ -1,460 +1,460 @@
import requests import requests
import json import json
import psycopg2 import psycopg2
from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
import time import time
from typing import List, Dict, Any from typing import List, Dict, Any
import logging import logging
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
import threading import threading
# 配置日志 # 配置日志
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s' format='%(asctime)s - %(levelname)s - %(message)s'
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class APIClient: class APIClient:
"""简化的API客户端类""" """简化的API客户端类"""
def __init__(self, api_url: str, auth_token: str): def __init__(self, api_url: str, auth_token: str):
self.api_url = api_url self.api_url = api_url
self.headers = { self.headers = {
'Authorization': auth_token, 'Authorization': auth_token,
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]: def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""直接调用带inputs参数的API """直接调用带inputs参数的API
Args: Args:
inputs: inputs参数字典 inputs: inputs参数字典
Returns: Returns:
API响应数据 API响应数据
""" """
payload = { payload = {
"inputs": inputs, "inputs": inputs,
"response_mode": "blocking", "response_mode": "blocking",
"user": "admin" "user": "admin"
} }
logger.info(f"调用APIpayload: {json.dumps(payload, ensure_ascii=False)}") logger.info(f"调用APIpayload: {json.dumps(payload, ensure_ascii=False)}")
try: try:
logger.info("调用带inputs参数的API") logger.info("调用带inputs参数的API")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
json=payload, # Using json parameter instead of data to let requests handle serialization json=payload, # Using json parameter instead of data to let requests handle serialization
timeout=300 # Reduced timeout to more reasonable value timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}") logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() if response.content else {} "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return { return {
"success": False, "success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}", "error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code "status_code": e.response.status_code
} }
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
"success": False, "success": False,
"error": str(e) "error": str(e)
} }
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
"""直接调用不带inputs参数的API """直接调用不带inputs参数的API
Args: Args:
other_params: 其他参数字典 other_params: 其他参数字典
Returns: Returns:
API响应数据 API响应数据
""" """
payload = { payload = {
"inputs": {}, "inputs": {},
"response_mode": "blocking", "response_mode": "blocking",
"user": "admin" "user": "admin"
} }
# 添加其他参数 # 添加其他参数
if other_params: if other_params:
payload.update(other_params) payload.update(other_params)
try: try:
logger.info(f"调用不带inputs参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}") logger.info(f"调用不带inputs参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
json=payload, # Using json parameter instead of data json=payload, # Using json parameter instead of data
timeout=300 # Reduced timeout to more reasonable value timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}") logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() if response.content else {} "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return { return {
"success": False, "success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}", "error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code "status_code": e.response.status_code
} }
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
"success": False, "success": False,
"error": str(e) "error": str(e)
} }
def call_api_with_query(self, query: str = None) -> Dict[str, Any]: def call_api_with_query(self, query: str = None) -> Dict[str, Any]:
"""直接调用带inputs参数的API """直接调用带inputs参数的API
Args: Args:
query: 查询语句 query: 查询语句
Returns: Returns:
API响应数据 API响应数据
""" """
payload = { payload = {
"inputs": {}, "inputs": {},
"query" : query, "query" : query,
"response_mode": "blocking", # Changed from streaming to blocking for consistency "response_mode": "blocking", # Changed from streaming to blocking for consistency
"user": "admin" "user": "admin"
} }
try: try:
logger.info(f"调用带query参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}") logger.info(f"调用带query参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
json=payload, # Using json parameter instead of data json=payload, # Using json parameter instead of data
timeout=300 # Reduced timeout to more reasonable value timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}") logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() if response.content else {} "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return { return {
"success": False, "success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}", "error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code "status_code": e.response.status_code
} }
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
"success": False, "success": False,
"error": str(e) "error": str(e)
} }
def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
"""直接调用不带inputs参数的API """直接调用不带inputs参数的API
Args: Args:
other_params: 其他参数字典 other_params: 其他参数字典
Returns: Returns:
API响应数据 API响应数据
""" """
payload = { payload = {
"inputs": {}, "inputs": {},
"query":"", "query":"",
"response_mode": "blocking", # Changed from streaming to blocking for consistency "response_mode": "blocking", # Changed from streaming to blocking for consistency
"user": "admin" "user": "admin"
} }
# 添加其他参数 # 添加其他参数
if other_params: if other_params:
payload.update(other_params) payload.update(other_params)
try: try:
logger.info(f"调用不带query参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}") logger.info(f"调用不带query参数的APIpayload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post( response = requests.post(
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
json=payload, # Using json parameter instead of data json=payload, # Using json parameter instead of data
timeout=300 # Reduced timeout to more reasonable value timeout=300 # Reduced timeout to more reasonable value
) )
logger.info(f"API调用完成状态码: {response.status_code}") logger.info(f"API调用完成状态码: {response.status_code}")
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
response.raise_for_status() response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}") logger.info(f"API调用成功状态码: {response.status_code}")
return { return {
"success": True, "success": True,
"status_code": response.status_code, "status_code": response.status_code,
"data": response.json() if response.content else {} "data": response.json() if response.content else {}
} }
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
return { return {
"success": False, "success": False,
"error": f"HTTP {e.response.status_code}: {e.response.text}", "error": f"HTTP {e.response.status_code}: {e.response.text}",
"status_code": e.response.status_code "status_code": e.response.status_code
} }
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}") logger.error(f"API调用失败: {e}")
return { return {
"success": False, "success": False,
"error": str(e) "error": str(e)
} }
class DatabaseManager: class DatabaseManager:
"""数据库管理类""" """数据库管理类"""
def __init__(self, db_config: Dict[str, str]): def __init__(self, db_config: Dict[str, str]):
self.db_config = db_config self.db_config = db_config
self.connection = None self.connection = None
def connect(self): def connect(self):
"""连接数据库""" """连接数据库"""
try: try:
self.connection = psycopg2.connect( self.connection = psycopg2.connect(
host=self.db_config.get('host', 'localhost'), host=self.db_config.get('host', 'localhost'),
port=self.db_config.get('port', '5432'), port=self.db_config.get('port', '5432'),
database=self.db_config.get('database', 'postgres'), database=self.db_config.get('database', 'postgres'),
user=self.db_config.get('user', 'postgres'), user=self.db_config.get('user', 'postgres'),
password=self.db_config.get('password', ''), password=self.db_config.get('password', ''),
cursor_factory=RealDictCursor cursor_factory=RealDictCursor
) )
logger.info("数据库连接成功") logger.info("数据库连接成功")
except Exception as e: except Exception as e:
logger.error(f"数据库连接失败: {e}") logger.error(f"数据库连接失败: {e}")
raise raise
def disconnect(self): def disconnect(self):
"""断开数据库连接""" """断开数据库连接"""
if self.connection: if self.connection:
self.connection.close() self.connection.close()
logger.info("数据库连接已关闭") logger.info("数据库连接已关闭")
def get_urls_from_database(self, query: str = None) -> List[str]: def get_urls_from_database(self, query: str = None) -> List[str]:
"""从数据库获取URL列表""" """从数据库获取URL列表"""
if not self.connection: if not self.connection:
self.connect() self.connect()
# 默认查询语句,根据实际情况修改 # 默认查询语句,根据实际情况修改
if query is None: if query is None:
query = "SELECT 1" query = "SELECT 1"
try: try:
with self.connection.cursor() as cursor: with self.connection.cursor() as cursor:
cursor.execute(query) cursor.execute(query)
results = cursor.fetchall() results = cursor.fetchall()
# 处理结果,返回字符串列表 # 处理结果,返回字符串列表
datas = [list(row.values())[0] for row in results] datas = [list(row.values())[0] for row in results]
return datas return datas
except Exception as e: except Exception as e:
logger.error(f"查询数据库失败: {e}") logger.error(f"查询数据库失败: {e}")
return [] return []
def process_single_item(api_client, item_text, type, item_index, total_count): def process_single_item(api_client, item_text, type, item_index, total_count):
"""处理单个项目""" """处理单个项目"""
logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item_text}") logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item_text}")
try: try:
if type == 'workflow': if type == 'workflow':
# 方法1: 使用带inputs参数的调用 # 方法1: 使用带inputs参数的调用
inputs_data = { inputs_data = {
'companys': f"{item_text}" 'companys': f"{item_text}"
} }
print(inputs_data) print(inputs_data)
result = api_client.call_api_with_inputs(inputs_data) result = api_client.call_api_with_inputs(inputs_data)
if result['success']: if result['success']:
logger.info(f"数据 {item_text} 处理成功") logger.info(f"数据 {item_text} 处理成功")
else: else:
logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") logger.error(f"数据 {item_text} 处理失败: {result.get('error')}")
return result return result
else: else:
#agent #agent
query = item_text query = item_text
result = api_client.call_api_with_query(query) result = api_client.call_api_with_query(query)
if result['success']: if result['success']:
logger.info(f"数据 {item_text} 处理成功") logger.info(f"数据 {item_text} 处理成功")
else: else:
logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") logger.error(f"数据 {item_text} 处理失败: {result.get('error')}")
return result return result
except Exception as e: except Exception as e:
logger.error(f"处理数据 {item_text} 时发生异常: {e}") logger.error(f"处理数据 {item_text} 时发生异常: {e}")
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp #DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
#DATABASE_SCHEMA=p70_ai_intelligence #DATABASE_SCHEMA=p70_ai_intelligence
def main(): def main():
"""主函数""" """主函数"""
# 配置并发数 # 配置并发数
MAX_WORKERS = 3 # 可调整为5或10 MAX_WORKERS = 3 # 可调整为5或10
# 数据库配置 # 数据库配置
db_config = { db_config = {
'host': '124.221.232.219', 'host': '124.221.232.219',
'port': '5432', 'port': '5432',
'database': 'daas_mpp', 'database': 'daas_mpp',
'user': 'dbuser_dba', 'user': 'dbuser_dba',
'password': 'EmBRxnmmjnE3', 'password': 'EmBRxnmmjnE3',
'schema': 'p70_ai_intelligence' 'schema': 'p70_ai_intelligence'
} }
# API配置 # API配置
api_config = { api_config = {
'url': 'https://tk-agent.idgvalue.com/v1/workflows/run', 'url': 'https://tk-agent.idgvalue.com/v1/workflows/run',
'auth_token': 'Bearer app-pv9rLM4ukTRayP2YD60cS4k5' 'auth_token': 'Bearer app-pv9rLM4ukTRayP2YD60cS4k5'
} }
api_client = APIClient(api_config['url'], api_config['auth_token']) api_client = APIClient(api_config['url'], api_config['auth_token'])
type = 'workflow' type = 'workflow'
try: try:
flag = True flag = True
if flag: if flag:
# 初始化 # 初始化
db_manager = DatabaseManager(db_config) db_manager = DatabaseManager(db_config)
custom_query = """ custom_query = """
WITH numbered_names AS ( WITH numbered_names AS (
SELECT SELECT
name, name,
(ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /5 as batch_num (ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /5 as batch_num
FROM p30_common.v_sql_cleaned_cn_d_account_info FROM p30_common.v_sql_cleaned_cn_d_account_info
WHERE name NOT IN ( WHERE name NOT IN (
SELECT "search" SELECT "search"
FROM p70_ai_intelligence.agent_account_info FROM p70_ai_intelligence.agent_account_info
WHERE "search" IS NOT NULL WHERE "search" IS NOT NULL
limit 100 )
) and name NOT IN (
and name NOT IN ( SELECT name
SELECT name FROM p70_ai_intelligence.agent_execp_account
FROM p70_ai_intelligence.agent_execp_account WHERE name IS NOT NULL
WHERE name IS NOT NULL )
) order by dw_account
limit 200 limit 5
) )
SELECT SELECT
STRING_AGG(name, E'\n') as batched_names STRING_AGG(name, E'\n') as batched_names
FROM numbered_names FROM numbered_names
GROUP BY batch_num GROUP BY batch_num
ORDER BY batch_num; ORDER BY batch_num;
""" """
try: try:
# 从数据库获取URL列表 # 从数据库获取URL列表
list = db_manager.get_urls_from_database(custom_query) list = db_manager.get_urls_from_database(custom_query)
if not list: if not list:
logger.warning("未获取到URL") logger.warning("未获取到URL")
return return
# 展平所有数据项为单个列表 # 展平所有数据项为单个列表
# all_items = [] # all_items = []
# for batch in list: # for batch in list:
# items = batch.split('\n') # items = batch.split('\n')
# all_items.extend(items) # all_items.extend(items)
logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程") logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程")
# 使用线程池并发处理 # 使用线程池并发处理
success_count = 0 success_count = 0
failed_count = 0 failed_count = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交所有任务 # 提交所有任务
future_to_item = { future_to_item = {
executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item
for i, item in enumerate(list) for i, item in enumerate(list)
} }
# 收集结果 # 收集结果
for future in as_completed(future_to_item): for future in as_completed(future_to_item):
item = future_to_item[future] item = future_to_item[future]
try: try:
result = future.result() result = future.result()
if result.get("success"): if result.get("success"):
success_count += 1 success_count += 1
else: else:
failed_count += 1 failed_count += 1
except Exception as e: except Exception as e:
logger.error(f"处理项目 {item} 时发生异常: {e}") logger.error(f"处理项目 {item} 时发生异常: {e}")
failed_count += 1 failed_count += 1
logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}") logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}")
except Exception as e: except Exception as e:
logger.error(f"程序执行失败: {e}") logger.error(f"程序执行失败: {e}")
finally: finally:
db_manager.disconnect() db_manager.disconnect()
else: else:
logger.info("调用不带inputs参数的API示例") logger.info("调用不带inputs参数的API示例")
if type == 'workflow': if type == 'workflow':
result2 = api_client.call_api_without_inputs() result2 = api_client.call_api_without_inputs()
if result2['success']: if result2['success']:
logger.info("不带inputs参数的API调用成功") logger.info("不带inputs参数的API调用成功")
else: else:
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
else: else:
#agent #agent
result2 = api_client.call_api_without_query() result2 = api_client.call_api_without_query()
if result2['success']: if result2['success']:
logger.info("不带inputs参数的API调用成功") logger.info("不带inputs参数的API调用成功")
else: else:
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
except Exception as e: except Exception as e:
logger.error(f"初始化失败: {e}") logger.error(f"初始化失败: {e}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()