diff --git a/dev/workflow/TK_Cust/account_info_complete/【153】客户基础信息补全/account_complete_153.py b/dev/workflow/TK_Cust/account_info_complete/【153】客户基础信息补全/account_complete_153.py index cc8aaa4..a8bf7b5 100644 --- a/dev/workflow/TK_Cust/account_info_complete/【153】客户基础信息补全/account_complete_153.py +++ b/dev/workflow/TK_Cust/account_info_complete/【153】客户基础信息补全/account_complete_153.py @@ -5,6 +5,8 @@ from psycopg2.extras import RealDictCursor import time from typing import List, Dict, Any import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading # 配置日志 logging.basicConfig( @@ -32,30 +34,41 @@ class APIClient: Returns: API响应数据 """ - payload = json.dumps({ + payload = { "inputs": inputs, "response_mode": "blocking", "user": "admin" - }) + } + logger.info(f"调用API,payload: {json.dumps(payload, ensure_ascii=False)}") try: logger.info("调用带inputs参数的API") response = requests.post( self.api_url, headers=self.headers, - data=payload, - timeout=1200 + json=payload, # Using json parameter instead of data to let requests handle serialization + timeout=300 # Reduced timeout to more reasonable value ) + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + response.raise_for_status() logger.info(f"API调用成功,状态码: {response.status_code}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": response.json() if response.content else {} } + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {e}") return { @@ -63,6 +76,7 @@ class APIClient: "error": str(e) } + def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: """直接调用不带inputs参数的API @@ -83,23 +97,33 @@ class APIClient: payload.update(other_params) try: - logger.info("调用不带inputs参数的API") + logger.info(f"调用不带inputs参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") response = requests.post( self.api_url, headers=self.headers, - data=json.dumps(payload), - timeout=1200 + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value ) + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + response.raise_for_status() logger.info(f"API调用成功,状态码: {response.status_code}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": response.json() if response.content else {} } + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {e}") return { @@ -117,31 +141,41 @@ class APIClient: Returns: API响应数据 """ - payload = json.dumps({ + payload = { "inputs": {}, "query" : query, - "response_mode": "streaming", + "response_mode": "blocking", # Changed from streaming to blocking for consistency "user": "admin" - }) + } try: - logger.info("调用带inputs参数的API") + logger.info(f"调用带query参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") response = requests.post( self.api_url, headers=self.headers, - data=payload, - timeout=2400 + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value ) + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + response.raise_for_status() logger.info(f"API调用成功,状态码: {response.status_code}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": response.json() if response.content else {} } + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {e}") return { @@ -161,7 +195,7 @@ class APIClient: payload = { "inputs": {}, "query":"", - "response_mode": "streaming", + "response_mode": "blocking", # Changed from streaming to blocking for consistency "user": "admin" } @@ -170,23 +204,33 @@ class APIClient: payload.update(other_params) try: - logger.info("调用不带inputs参数的API") + logger.info(f"调用不带query参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") response = requests.post( self.api_url, headers=self.headers, - data=json.dumps(payload), - timeout=2400 + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value ) + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + response.raise_for_status() logger.info(f"API调用成功,状态码: {response.status_code}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": response.json() if response.content else {} } + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {e}") return { @@ -244,10 +288,49 @@ class DatabaseManager: except Exception as e: logger.error(f"查询数据库失败: {e}") return [] + +def process_single_item(api_client, item_text, type, item_index, total_count): + """处理单个项目""" + logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item_text}") + + try: + if type == 'workflow': + # 方法1: 使用带inputs参数的调用 + inputs_data = { + 'companys': f"{item_text}" + } + print(inputs_data) + + result = api_client.call_api_with_inputs(inputs_data) + + if result['success']: + logger.info(f"数据 {item_text} 处理成功") + else: + logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") + return result + + else: + #agent + query = item_text + result = api_client.call_api_with_query(query) + + if result['success']: + logger.info(f"数据 {item_text} 处理成功") + else: + logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") + return result + + except Exception as e: + logger.error(f"处理数据 {item_text} 时发生异常: {e}") + return {"success": False, "error": str(e)} + #DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp #DATABASE_SCHEMA=p70_ai_intelligence def main(): """主函数""" + # 配置并发数 + MAX_WORKERS = 3 # 可调整为5或10 + # 数据库配置 db_config = { 'host': '124.221.232.219', @@ -260,13 +343,12 @@ def main(): # API配置 api_config = { - 'url': 'https://tk-agent.idgvalue.com/v1/workflows/run', - 'auth_token': 'Bearer app-pv9rLM4ukTRayP2YD60cS4k5' + 'url': 'http://10.168.1.153:18000/v1/workflows/run', + 'auth_token': 'Bearer app-JYQCHu09hlZn0b0OUVW3PRdr' } api_client = APIClient(api_config['url'], api_config['auth_token']) - type = 'agent' type = 'workflow' try: @@ -275,7 +357,33 @@ def main(): if flag: # 初始化 db_manager = DatabaseManager(db_config) - custom_query = """ ${custom_query} """ + custom_query = """ +WITH numbered_names AS ( + SELECT + name, + (ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /5 as batch_num + FROM p30_common.v_sql_cleaned_cn_d_account_info + WHERE name NOT IN ( + SELECT "search" + FROM p70_ai_intelligence.agent_account_info + WHERE "search" IS NOT NULL + ) + and name NOT IN ( + SELECT name + FROM p70_ai_intelligence.agent_execp_account + WHERE name IS NOT NULL + ) + order by dw_account + offset 2250 + limit 4 +) +SELECT + STRING_AGG(name, E'\n') as batched_names +FROM numbered_names +GROUP BY batch_num +ORDER BY batch_num; + +""" try: # 从数据库获取URL列表 @@ -285,37 +393,39 @@ def main(): logger.warning("未获取到URL") return - # 遍历每个URL调用API - for i, text in enumerate(list, 1): - logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") + # 展平所有数据项为单个列表 + # all_items = [] + # for batch in list: + # items = batch.split('\n') + # all_items.extend(items) + + logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程") + + # 使用线程池并发处理 + success_count = 0 + failed_count = 0 + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # 提交所有任务 + future_to_item = { + executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item + for i, item in enumerate(list) + } - if type is 'workflow': - # 方法1: 使用带inputs参数的调用 - inputs_data = { - "urls": f"{text}" - } - - result = api_client.call_api_with_inputs(inputs_data) - - if result['success']: - logger.info(f"URL {text} 处理成功") - else: - logger.error(f"URL {text} 处理失败: {result.get('error')}") - - - else: - #agent - query = text - result = api_client.call_api_with_query(query) - - if result['success']: - logger.info(f"URL {text} 处理成功") - else: - logger.error(f"URL {text} 处理失败: {result.get('error')}") - - # 可选:添加延迟避免请求过于频繁 - if i < len(text): - time.sleep(1) + # 收集结果 + for future in as_completed(future_to_item): + item = future_to_item[future] + try: + result = future.result() + if result.get("success"): + success_count += 1 + else: + failed_count += 1 + except Exception as e: + logger.error(f"处理项目 {item} 时发生异常: {e}") + failed_count += 1 + + logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}") except Exception as e: logger.error(f"程序执行失败: {e}") @@ -325,7 +435,7 @@ def main(): else: logger.info("调用不带inputs参数的API示例") - if type is 'workflow': + if type == 'workflow': result2 = api_client.call_api_without_inputs() if result2['success']: diff --git a/dev/workflow/TK_Cust/account_info_complete/【156】客户基础信息补全/account_info_complete.py b/dev/workflow/TK_Cust/account_info_complete/【156】客户基础信息补全/account_info_complete.py index 14f5ca6..ddf272a 100644 --- a/dev/workflow/TK_Cust/account_info_complete/【156】客户基础信息补全/account_info_complete.py +++ b/dev/workflow/TK_Cust/account_info_complete/【156】客户基础信息补全/account_info_complete.py @@ -34,31 +34,41 @@ class APIClient: Returns: API响应数据 """ - payload = json.dumps({ + payload = { "inputs": inputs, "response_mode": "blocking", "user": "admin" - }) - logger.info(f"调用API,payload: {payload}") + } + logger.info(f"调用API,payload: {json.dumps(payload, ensure_ascii=False)}") try: logger.info("调用带inputs参数的API") response = requests.post( self.api_url, headers=self.headers, - data=payload, - timeout=2400 + json=payload, # Using json parameter instead of data to let requests handle serialization + timeout=300 # Reduced timeout to more reasonable value ) + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + response.raise_for_status() logger.info(f"API调用成功,状态码: {response.status_code}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": response.json() if response.content else {} } + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {e}") return { @@ -87,23 +97,33 @@ class APIClient: payload.update(other_params) try: - logger.info("调用不带inputs参数的API") + logger.info(f"调用不带inputs参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") response = requests.post( self.api_url, headers=self.headers, - data=json.dumps(payload), - timeout=1200 + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value ) + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + response.raise_for_status() logger.info(f"API调用成功,状态码: {response.status_code}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": response.json() if response.content else {} } + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {e}") return { @@ -121,31 +141,41 @@ class APIClient: Returns: API响应数据 """ - payload = json.dumps({ + payload = { "inputs": {}, "query" : query, - "response_mode": "streaming", + "response_mode": "blocking", # Changed from streaming to blocking for consistency "user": "admin" - }) + } try: - logger.info("调用带inputs参数的API") + logger.info(f"调用带query参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") response = requests.post( self.api_url, headers=self.headers, - data=payload, - timeout=2400 + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value ) + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + response.raise_for_status() logger.info(f"API调用成功,状态码: {response.status_code}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": response.json() if response.content else {} } + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {e}") return { @@ -165,7 +195,7 @@ class APIClient: payload = { "inputs": {}, "query":"", - "response_mode": "streaming", + "response_mode": "blocking", # Changed from streaming to blocking for consistency "user": "admin" } @@ -174,23 +204,33 @@ class APIClient: payload.update(other_params) try: - logger.info("调用不带inputs参数的API") + logger.info(f"调用不带query参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") response = requests.post( self.api_url, headers=self.headers, - data=json.dumps(payload), - timeout=2400 + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value ) + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + response.raise_for_status() logger.info(f"API调用成功,状态码: {response.status_code}") return { "success": True, "status_code": response.status_code, - "data": response.json() + "data": response.json() if response.content else {} } + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {e}") return { @@ -259,6 +299,7 @@ def process_single_item(api_client, item_text, type, item_index, total_count): inputs_data = { 'companys': f"{item_text}" } + print(inputs_data) result = api_client.call_api_with_inputs(inputs_data) @@ -288,7 +329,7 @@ def process_single_item(api_client, item_text, type, item_index, total_count): def main(): """主函数""" # 配置并发数 - MAX_WORKERS = 10 # 可调整为5或10 + MAX_WORKERS = 3 # 可调整为5或10 # 数据库配置 db_config = { @@ -308,7 +349,6 @@ def main(): api_client = APIClient(api_config['url'], api_config['auth_token']) - type = 'agent' type = 'workflow' try: @@ -321,7 +361,7 @@ def main(): WITH numbered_names AS ( SELECT name, - (ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /10 as batch_num + (ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /5 as batch_num FROM p30_common.v_sql_cleaned_cn_d_account_info WHERE name NOT IN ( SELECT "search" @@ -333,7 +373,9 @@ WITH numbered_names AS ( FROM p70_ai_intelligence.agent_execp_account WHERE name IS NOT NULL ) - limit 100 + order by dw_account + offset 750 + limit 5 ) SELECT STRING_AGG(name, E'\n') as batched_names diff --git a/dev/workflow/TK_Cust/account_info_complete/【162】客户基础信息补全/account_complete_162.py b/dev/workflow/TK_Cust/account_info_complete/【162】客户基础信息补全/account_complete_162.py index cc8aaa4..bf5e9d4 100644 --- a/dev/workflow/TK_Cust/account_info_complete/【162】客户基础信息补全/account_complete_162.py +++ b/dev/workflow/TK_Cust/account_info_complete/【162】客户基础信息补全/account_complete_162.py @@ -1,351 +1,461 @@ -import requests -import json -import psycopg2 -from psycopg2.extras import RealDictCursor -import time -from typing import List, Dict, Any -import logging - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - -class APIClient: - """简化的API客户端类""" - - def __init__(self, api_url: str, auth_token: str): - self.api_url = api_url - self.headers = { - 'Authorization': auth_token, - 'Content-Type': 'application/json' - } - - def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]: - """直接调用带inputs参数的API - - Args: - inputs: inputs参数字典 - - Returns: - API响应数据 - """ - payload = json.dumps({ - "inputs": inputs, - "response_mode": "blocking", - "user": "admin" - }) - - try: - logger.info("调用带inputs参数的API") - response = requests.post( - self.api_url, - headers=self.headers, - data=payload, - timeout=1200 - ) - - response.raise_for_status() - logger.info(f"API调用成功,状态码: {response.status_code}") - - return { - "success": True, - "status_code": response.status_code, - "data": response.json() - } - - except requests.exceptions.RequestException as e: - logger.error(f"API调用失败: {e}") - return { - "success": False, - "error": str(e) - } - - def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: - """直接调用不带inputs参数的API - - Args: - other_params: 其他参数字典 - - Returns: - API响应数据 - """ - payload = { - "inputs": {}, - "response_mode": "blocking", - "user": "admin" - } - - # 添加其他参数 - if other_params: - payload.update(other_params) - - try: - logger.info("调用不带inputs参数的API") - response = requests.post( - self.api_url, - headers=self.headers, - data=json.dumps(payload), - timeout=1200 - ) - - response.raise_for_status() - logger.info(f"API调用成功,状态码: {response.status_code}") - - return { - "success": True, - "status_code": response.status_code, - "data": response.json() - } - - except requests.exceptions.RequestException as e: - logger.error(f"API调用失败: {e}") - return { - "success": False, - "error": str(e) - } - - - def call_api_with_query(self, query: str = None) -> Dict[str, Any]: - """直接调用带inputs参数的API - - Args: - query: 查询语句 - - Returns: - API响应数据 - """ - payload = json.dumps({ - "inputs": {}, - "query" : query, - "response_mode": "streaming", - "user": "admin" - }) - - try: - logger.info("调用带inputs参数的API") - response = requests.post( - self.api_url, - headers=self.headers, - data=payload, - timeout=2400 - ) - - response.raise_for_status() - logger.info(f"API调用成功,状态码: {response.status_code}") - - return { - "success": True, - "status_code": response.status_code, - "data": response.json() - } - - except requests.exceptions.RequestException as e: - logger.error(f"API调用失败: {e}") - return { - "success": False, - "error": str(e) - } - - def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: - """直接调用不带inputs参数的API - - Args: - other_params: 其他参数字典 - - Returns: - API响应数据 - """ - payload = { - "inputs": {}, - "query":"", - "response_mode": "streaming", - "user": "admin" - } - - # 添加其他参数 - if other_params: - payload.update(other_params) - - try: - logger.info("调用不带inputs参数的API") - response = requests.post( - self.api_url, - headers=self.headers, - data=json.dumps(payload), - timeout=2400 - ) - - response.raise_for_status() - logger.info(f"API调用成功,状态码: {response.status_code}") - - return { - "success": True, - "status_code": response.status_code, - "data": response.json() - } - - except requests.exceptions.RequestException as e: - logger.error(f"API调用失败: {e}") - return { - "success": False, - "error": str(e) - } - -class DatabaseManager: - """数据库管理类""" - - def __init__(self, db_config: Dict[str, str]): - self.db_config = db_config - self.connection = None - - def connect(self): - """连接数据库""" - try: - self.connection = psycopg2.connect( - host=self.db_config.get('host', 'localhost'), - port=self.db_config.get('port', '5432'), - database=self.db_config.get('database', 'postgres'), - user=self.db_config.get('user', 'postgres'), - password=self.db_config.get('password', ''), - cursor_factory=RealDictCursor - ) - logger.info("数据库连接成功") - except Exception as e: - logger.error(f"数据库连接失败: {e}") - raise - - def disconnect(self): - """断开数据库连接""" - if self.connection: - self.connection.close() - logger.info("数据库连接已关闭") - - def get_urls_from_database(self, query: str = None) -> List[str]: - """从数据库获取URL列表""" - if not self.connection: - self.connect() - - # 默认查询语句,根据实际情况修改 - if query is None: - query = "SELECT 1" - - try: - with self.connection.cursor() as cursor: - cursor.execute(query) - results = cursor.fetchall() - - # 处理结果,返回字符串列表 - datas = [list(row.values())[0] for row in results] - return datas - - except Exception as e: - logger.error(f"查询数据库失败: {e}") - return [] -#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp -#DATABASE_SCHEMA=p70_ai_intelligence -def main(): - """主函数""" - # 数据库配置 - db_config = { - 'host': '124.221.232.219', - 'port': '5432', - 'database': 'daas_mpp', - 'user': 'dbuser_dba', - 'password': 'EmBRxnmmjnE3', - 'schema': 'p70_ai_intelligence' - } - - # API配置 - api_config = { - 'url': 'https://tk-agent.idgvalue.com/v1/workflows/run', - 'auth_token': 'Bearer app-pv9rLM4ukTRayP2YD60cS4k5' - } - - api_client = APIClient(api_config['url'], api_config['auth_token']) - - type = 'agent' - type = 'workflow' - - try: - flag = True - - if flag: - # 初始化 - db_manager = DatabaseManager(db_config) - custom_query = """ ${custom_query} """ - - try: - # 从数据库获取URL列表 - list = db_manager.get_urls_from_database(custom_query) - - if not list: - logger.warning("未获取到URL") - return - - # 遍历每个URL调用API - for i, text in enumerate(list, 1): - logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") - - if type is 'workflow': - # 方法1: 使用带inputs参数的调用 - inputs_data = { - "urls": f"{text}" - } - - result = api_client.call_api_with_inputs(inputs_data) - - if result['success']: - logger.info(f"URL {text} 处理成功") - else: - logger.error(f"URL {text} 处理失败: {result.get('error')}") - - - else: - #agent - query = text - result = api_client.call_api_with_query(query) - - if result['success']: - logger.info(f"URL {text} 处理成功") - else: - logger.error(f"URL {text} 处理失败: {result.get('error')}") - - # 可选:添加延迟避免请求过于频繁 - if i < len(text): - time.sleep(1) - - except Exception as e: - logger.error(f"程序执行失败: {e}") - - finally: - db_manager.disconnect() - - else: - logger.info("调用不带inputs参数的API示例") - if type is 'workflow': - result2 = api_client.call_api_without_inputs() - - if result2['success']: - logger.info("不带inputs参数的API调用成功") - else: - logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") - - else: - #agent - result2 = api_client.call_api_without_query() - - if result2['success']: - logger.info("不带inputs参数的API调用成功") - else: - logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") - - except Exception as e: - logger.error(f"初始化失败: {e}") - - - -if __name__ == "__main__": +import requests +import json +import psycopg2 +from psycopg2.extras import RealDictCursor +import time +from typing import List, Dict, Any +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +class APIClient: + """简化的API客户端类""" + + def __init__(self, api_url: str, auth_token: str): + self.api_url = api_url + self.headers = { + 'Authorization': auth_token, + 'Content-Type': 'application/json' + } + + def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]: + """直接调用带inputs参数的API + + Args: + inputs: inputs参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": inputs, + "response_mode": "blocking", + "user": "admin" + } + logger.info(f"调用API,payload: {json.dumps(payload, ensure_ascii=False)}") + + try: + logger.info("调用带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + json=payload, # Using json parameter instead of data to let requests handle serialization + timeout=300 # Reduced timeout to more reasonable value + ) + + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() if response.content else {} + } + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + + def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: + """直接调用不带inputs参数的API + + Args: + other_params: 其他参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "response_mode": "blocking", + "user": "admin" + } + + # 添加其他参数 + if other_params: + payload.update(other_params) + + try: + logger.info(f"调用不带inputs参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") + response = requests.post( + self.api_url, + headers=self.headers, + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value + ) + + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() if response.content else {} + } + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + + def call_api_with_query(self, query: str = None) -> Dict[str, Any]: + """直接调用带inputs参数的API + + Args: + query: 查询语句 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "query" : query, + "response_mode": "blocking", # Changed from streaming to blocking for consistency + "user": "admin" + } + + try: + logger.info(f"调用带query参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") + response = requests.post( + self.api_url, + headers=self.headers, + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value + ) + + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() if response.content else {} + } + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: + """直接调用不带inputs参数的API + + Args: + other_params: 其他参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "query":"", + "response_mode": "blocking", # Changed from streaming to blocking for consistency + "user": "admin" + } + + # 添加其他参数 + if other_params: + payload.update(other_params) + + try: + logger.info(f"调用不带query参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") + response = requests.post( + self.api_url, + headers=self.headers, + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value + ) + + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() if response.content else {} + } + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + +class DatabaseManager: + """数据库管理类""" + + def __init__(self, db_config: Dict[str, str]): + self.db_config = db_config + self.connection = None + + def connect(self): + """连接数据库""" + try: + self.connection = psycopg2.connect( + host=self.db_config.get('host', 'localhost'), + port=self.db_config.get('port', '5432'), + database=self.db_config.get('database', 'postgres'), + user=self.db_config.get('user', 'postgres'), + password=self.db_config.get('password', ''), + cursor_factory=RealDictCursor + ) + logger.info("数据库连接成功") + except Exception as e: + logger.error(f"数据库连接失败: {e}") + raise + + def disconnect(self): + """断开数据库连接""" + if self.connection: + self.connection.close() + logger.info("数据库连接已关闭") + + def get_urls_from_database(self, query: str = None) -> List[str]: + """从数据库获取URL列表""" + if not self.connection: + self.connect() + + # 默认查询语句,根据实际情况修改 + if query is None: + query = "SELECT 1" + + try: + with self.connection.cursor() as cursor: + cursor.execute(query) + results = cursor.fetchall() + + # 处理结果,返回字符串列表 + datas = [list(row.values())[0] for row in results] + return datas + + except Exception as e: + logger.error(f"查询数据库失败: {e}") + return [] + +def process_single_item(api_client, item_text, type, item_index, total_count): + """处理单个项目""" + logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item_text}") + + try: + if type == 'workflow': + # 方法1: 使用带inputs参数的调用 + inputs_data = { + 'companys': f"{item_text}" + } + print(inputs_data) + + result = api_client.call_api_with_inputs(inputs_data) + + if result['success']: + logger.info(f"数据 {item_text} 处理成功") + else: + logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") + return result + + else: + #agent + query = item_text + result = api_client.call_api_with_query(query) + + if result['success']: + logger.info(f"数据 {item_text} 处理成功") + else: + logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") + return result + + except Exception as e: + logger.error(f"处理数据 {item_text} 时发生异常: {e}") + return {"success": False, "error": str(e)} + +#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp +#DATABASE_SCHEMA=p70_ai_intelligence +def main(): + """主函数""" + # 配置并发数 + MAX_WORKERS = 3 # 可调整为5或10 + + # 数据库配置 + db_config = { + 'host': '124.221.232.219', + 'port': '5432', + 'database': 'daas_mpp', + 'user': 'dbuser_dba', + 'password': 'EmBRxnmmjnE3', + 'schema': 'p70_ai_intelligence' + } + + # API配置 + api_config = { + 'url': 'http://10.168.1.162:18000/v1/workflows/run', + 'auth_token': 'Bearer app-C0iPJse2Iutj7D6sngAv2eUv' + } + + api_client = APIClient(api_config['url'], api_config['auth_token']) + + type = 'workflow' + + try: + flag = True + + if flag: + # 初始化 + db_manager = DatabaseManager(db_config) + custom_query = """ +WITH numbered_names AS ( + SELECT + name, + (ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /5 as batch_num + FROM p30_common.v_sql_cleaned_cn_d_account_info + WHERE name NOT IN ( + SELECT "search" + FROM p70_ai_intelligence.agent_account_info + WHERE "search" IS NOT NULL + ) + and name NOT IN ( + SELECT name + FROM p70_ai_intelligence.agent_execp_account + WHERE name IS NOT NULL + ) + order by dw_account + offset 1500 + limit 5 +) +SELECT + STRING_AGG(name, E'\n') as batched_names +FROM numbered_names +GROUP BY batch_num +ORDER BY batch_num; + +""" + + try: + # 从数据库获取URL列表 + list = db_manager.get_urls_from_database(custom_query) + + if not list: + logger.warning("未获取到URL") + return + + # 展平所有数据项为单个列表 + # all_items = [] + # for batch in list: + # items = batch.split('\n') + # all_items.extend(items) + + logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程") + + # 使用线程池并发处理 + success_count = 0 + failed_count = 0 + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # 提交所有任务 + future_to_item = { + executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item + for i, item in enumerate(list) + } + + # 收集结果 + for future in as_completed(future_to_item): + item = future_to_item[future] + try: + result = future.result() + if result.get("success"): + success_count += 1 + else: + failed_count += 1 + except Exception as e: + logger.error(f"处理项目 {item} 时发生异常: {e}") + failed_count += 1 + + logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}") + + except Exception as e: + logger.error(f"程序执行失败: {e}") + + finally: + db_manager.disconnect() + + else: + logger.info("调用不带inputs参数的API示例") + if type == 'workflow': + result2 = api_client.call_api_without_inputs() + + if result2['success']: + logger.info("不带inputs参数的API调用成功") + else: + logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") + + else: + #agent + result2 = api_client.call_api_without_query() + + if result2['success']: + logger.info("不带inputs参数的API调用成功") + else: + logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") + + except Exception as e: + logger.error(f"初始化失败: {e}") + + + +if __name__ == "__main__": main() \ No newline at end of file diff --git a/dev/workflow/TK_Cust/account_info_complete/【163】客户基础信息补全/account_complete_163.py b/dev/workflow/TK_Cust/account_info_complete/【163】客户基础信息补全/account_complete_163.py index 5566ab0..74b6090 100644 --- a/dev/workflow/TK_Cust/account_info_complete/【163】客户基础信息补全/account_complete_163.py +++ b/dev/workflow/TK_Cust/account_info_complete/【163】客户基础信息补全/account_complete_163.py @@ -1,460 +1,460 @@ -import requests -import json -import psycopg2 -from psycopg2.extras import RealDictCursor -import time -from typing import List, Dict, Any -import logging -from concurrent.futures import ThreadPoolExecutor, as_completed -import threading - -# 配置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - -class APIClient: - """简化的API客户端类""" - - def __init__(self, api_url: str, auth_token: str): - self.api_url = api_url - self.headers = { - 'Authorization': auth_token, - 'Content-Type': 'application/json' - } - - def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]: - """直接调用带inputs参数的API - - Args: - inputs: inputs参数字典 - - Returns: - API响应数据 - """ - payload = { - "inputs": inputs, - "response_mode": "blocking", - "user": "admin" - } - logger.info(f"调用API,payload: {json.dumps(payload, ensure_ascii=False)}") - - try: - logger.info("调用带inputs参数的API") - response = requests.post( - self.api_url, - headers=self.headers, - json=payload, # Using json parameter instead of data to let requests handle serialization - timeout=300 # Reduced timeout to more reasonable value - ) - - logger.info(f"API调用完成,状态码: {response.status_code}") - logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response - - response.raise_for_status() - logger.info(f"API调用成功,状态码: {response.status_code}") - - return { - "success": True, - "status_code": response.status_code, - "data": response.json() if response.content else {} - } - - except requests.exceptions.HTTPError as e: - logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") - return { - "success": False, - "error": f"HTTP {e.response.status_code}: {e.response.text}", - "status_code": e.response.status_code - } - except requests.exceptions.RequestException as e: - logger.error(f"API调用失败: {e}") - return { - "success": False, - "error": str(e) - } - - - def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: - """直接调用不带inputs参数的API - - Args: - other_params: 其他参数字典 - - Returns: - API响应数据 - """ - payload = { - "inputs": {}, - "response_mode": "blocking", - "user": "admin" - } - - # 添加其他参数 - if other_params: - payload.update(other_params) - - try: - logger.info(f"调用不带inputs参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") - response = requests.post( - self.api_url, - headers=self.headers, - json=payload, # Using json parameter instead of data - timeout=300 # Reduced timeout to more reasonable value - ) - - logger.info(f"API调用完成,状态码: {response.status_code}") - logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response - - response.raise_for_status() - logger.info(f"API调用成功,状态码: {response.status_code}") - - return { - "success": True, - "status_code": response.status_code, - "data": response.json() if response.content else {} - } - - except requests.exceptions.HTTPError as e: - logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") - return { - "success": False, - "error": f"HTTP {e.response.status_code}: {e.response.text}", - "status_code": e.response.status_code - } - except requests.exceptions.RequestException as e: - logger.error(f"API调用失败: {e}") - return { - "success": False, - "error": str(e) - } - - - def call_api_with_query(self, query: str = None) -> Dict[str, Any]: - """直接调用带inputs参数的API - - Args: - query: 查询语句 - - Returns: - API响应数据 - """ - payload = { - "inputs": {}, - "query" : query, - "response_mode": "blocking", # Changed from streaming to blocking for consistency - "user": "admin" - } - - try: - logger.info(f"调用带query参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") - response = requests.post( - self.api_url, - headers=self.headers, - json=payload, # Using json parameter instead of data - timeout=300 # Reduced timeout to more reasonable value - ) - - logger.info(f"API调用完成,状态码: {response.status_code}") - logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response - - response.raise_for_status() - logger.info(f"API调用成功,状态码: {response.status_code}") - - return { - "success": True, - "status_code": response.status_code, - "data": response.json() if response.content else {} - } - - except requests.exceptions.HTTPError as e: - logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") - return { - "success": False, - "error": f"HTTP {e.response.status_code}: {e.response.text}", - "status_code": e.response.status_code - } - except requests.exceptions.RequestException as e: - logger.error(f"API调用失败: {e}") - return { - "success": False, - "error": str(e) - } - - def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: - """直接调用不带inputs参数的API - - Args: - other_params: 其他参数字典 - - Returns: - API响应数据 - """ - payload = { - "inputs": {}, - "query":"", - "response_mode": "blocking", # Changed from streaming to blocking for consistency - "user": "admin" - } - - # 添加其他参数 - if other_params: - payload.update(other_params) - - try: - logger.info(f"调用不带query参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") - response = requests.post( - self.api_url, - headers=self.headers, - json=payload, # Using json parameter instead of data - timeout=300 # Reduced timeout to more reasonable value - ) - - logger.info(f"API调用完成,状态码: {response.status_code}") - logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response - - response.raise_for_status() - logger.info(f"API调用成功,状态码: {response.status_code}") - - return { - "success": True, - "status_code": response.status_code, - "data": response.json() if response.content else {} - } - - except requests.exceptions.HTTPError as e: - logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") - return { - "success": False, - "error": f"HTTP {e.response.status_code}: {e.response.text}", - "status_code": e.response.status_code - } - except requests.exceptions.RequestException as e: - logger.error(f"API调用失败: {e}") - return { - "success": False, - "error": str(e) - } - -class DatabaseManager: - """数据库管理类""" - - def __init__(self, db_config: Dict[str, str]): - self.db_config = db_config - self.connection = None - - def connect(self): - """连接数据库""" - try: - self.connection = psycopg2.connect( - host=self.db_config.get('host', 'localhost'), - port=self.db_config.get('port', '5432'), - database=self.db_config.get('database', 'postgres'), - user=self.db_config.get('user', 'postgres'), - password=self.db_config.get('password', ''), - cursor_factory=RealDictCursor - ) - logger.info("数据库连接成功") - except Exception as e: - logger.error(f"数据库连接失败: {e}") - raise - - def disconnect(self): - """断开数据库连接""" - if self.connection: - self.connection.close() - logger.info("数据库连接已关闭") - - def get_urls_from_database(self, query: str = None) -> List[str]: - """从数据库获取URL列表""" - if not self.connection: - self.connect() - - # 默认查询语句,根据实际情况修改 - if query is None: - query = "SELECT 1" - - try: - with self.connection.cursor() as cursor: - cursor.execute(query) - results = cursor.fetchall() - - # 处理结果,返回字符串列表 - datas = [list(row.values())[0] for row in results] - return datas - - except Exception as e: - logger.error(f"查询数据库失败: {e}") - return [] - -def process_single_item(api_client, item_text, type, item_index, total_count): - """处理单个项目""" - logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item_text}") - - try: - if type == 'workflow': - # 方法1: 使用带inputs参数的调用 - inputs_data = { - 'companys': f"{item_text}" - } - print(inputs_data) - - result = api_client.call_api_with_inputs(inputs_data) - - if result['success']: - logger.info(f"数据 {item_text} 处理成功") - else: - logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") - return result - - else: - #agent - query = item_text - result = api_client.call_api_with_query(query) - - if result['success']: - logger.info(f"数据 {item_text} 处理成功") - else: - logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") - return result - - except Exception as e: - logger.error(f"处理数据 {item_text} 时发生异常: {e}") - return {"success": False, "error": str(e)} - -#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp -#DATABASE_SCHEMA=p70_ai_intelligence -def main(): - """主函数""" - # 配置并发数 - MAX_WORKERS = 3 # 可调整为5或10 - - # 数据库配置 - db_config = { - 'host': '124.221.232.219', - 'port': '5432', - 'database': 'daas_mpp', - 'user': 'dbuser_dba', - 'password': 'EmBRxnmmjnE3', - 'schema': 'p70_ai_intelligence' - } - - # API配置 - api_config = { - 'url': 'https://tk-agent.idgvalue.com/v1/workflows/run', - 'auth_token': 'Bearer app-pv9rLM4ukTRayP2YD60cS4k5' - } - - api_client = APIClient(api_config['url'], api_config['auth_token']) - - type = 'workflow' - - try: - flag = True - - if flag: - # 初始化 - db_manager = DatabaseManager(db_config) - custom_query = """ -WITH numbered_names AS ( - SELECT - name, - (ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /5 as batch_num - FROM p30_common.v_sql_cleaned_cn_d_account_info - WHERE name NOT IN ( - SELECT "search" - FROM p70_ai_intelligence.agent_account_info - WHERE "search" IS NOT NULL - limit 100 - ) - and name NOT IN ( - SELECT name - FROM p70_ai_intelligence.agent_execp_account - WHERE name IS NOT NULL - ) - limit 200 -) -SELECT - STRING_AGG(name, E'\n') as batched_names -FROM numbered_names -GROUP BY batch_num -ORDER BY batch_num; - -""" - - try: - # 从数据库获取URL列表 - list = db_manager.get_urls_from_database(custom_query) - - if not list: - logger.warning("未获取到URL") - return - - # 展平所有数据项为单个列表 - # all_items = [] - # for batch in list: - # items = batch.split('\n') - # all_items.extend(items) - - logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程") - - # 使用线程池并发处理 - success_count = 0 - failed_count = 0 - - with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - # 提交所有任务 - future_to_item = { - executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item - for i, item in enumerate(list) - } - - # 收集结果 - for future in as_completed(future_to_item): - item = future_to_item[future] - try: - result = future.result() - if result.get("success"): - success_count += 1 - else: - failed_count += 1 - except Exception as e: - logger.error(f"处理项目 {item} 时发生异常: {e}") - failed_count += 1 - - logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}") - - except Exception as e: - logger.error(f"程序执行失败: {e}") - - finally: - db_manager.disconnect() - - else: - logger.info("调用不带inputs参数的API示例") - if type == 'workflow': - result2 = api_client.call_api_without_inputs() - - if result2['success']: - logger.info("不带inputs参数的API调用成功") - else: - logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") - - else: - #agent - result2 = api_client.call_api_without_query() - - if result2['success']: - logger.info("不带inputs参数的API调用成功") - else: - logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") - - except Exception as e: - logger.error(f"初始化失败: {e}") - - - -if __name__ == "__main__": +import requests +import json +import psycopg2 +from psycopg2.extras import RealDictCursor +import time +from typing import List, Dict, Any +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +class APIClient: + """简化的API客户端类""" + + def __init__(self, api_url: str, auth_token: str): + self.api_url = api_url + self.headers = { + 'Authorization': auth_token, + 'Content-Type': 'application/json' + } + + def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]: + """直接调用带inputs参数的API + + Args: + inputs: inputs参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": inputs, + "response_mode": "blocking", + "user": "admin" + } + logger.info(f"调用API,payload: {json.dumps(payload, ensure_ascii=False)}") + + try: + logger.info("调用带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + json=payload, # Using json parameter instead of data to let requests handle serialization + timeout=300 # Reduced timeout to more reasonable value + ) + + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() if response.content else {} + } + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + + def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: + """直接调用不带inputs参数的API + + Args: + other_params: 其他参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "response_mode": "blocking", + "user": "admin" + } + + # 添加其他参数 + if other_params: + payload.update(other_params) + + try: + logger.info(f"调用不带inputs参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") + response = requests.post( + self.api_url, + headers=self.headers, + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value + ) + + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() if response.content else {} + } + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + + def call_api_with_query(self, query: str = None) -> Dict[str, Any]: + """直接调用带inputs参数的API + + Args: + query: 查询语句 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "query" : query, + "response_mode": "blocking", # Changed from streaming to blocking for consistency + "user": "admin" + } + + try: + logger.info(f"调用带query参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") + response = requests.post( + self.api_url, + headers=self.headers, + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value + ) + + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() if response.content else {} + } + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: + """直接调用不带inputs参数的API + + Args: + other_params: 其他参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "query":"", + "response_mode": "blocking", # Changed from streaming to blocking for consistency + "user": "admin" + } + + # 添加其他参数 + if other_params: + payload.update(other_params) + + try: + logger.info(f"调用不带query参数的API,payload: {json.dumps(payload, ensure_ascii=False)}") + response = requests.post( + self.api_url, + headers=self.headers, + json=payload, # Using json parameter instead of data + timeout=300 # Reduced timeout to more reasonable value + ) + + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() if response.content else {} + } + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + +class DatabaseManager: + """数据库管理类""" + + def __init__(self, db_config: Dict[str, str]): + self.db_config = db_config + self.connection = None + + def connect(self): + """连接数据库""" + try: + self.connection = psycopg2.connect( + host=self.db_config.get('host', 'localhost'), + port=self.db_config.get('port', '5432'), + database=self.db_config.get('database', 'postgres'), + user=self.db_config.get('user', 'postgres'), + password=self.db_config.get('password', ''), + cursor_factory=RealDictCursor + ) + logger.info("数据库连接成功") + except Exception as e: + logger.error(f"数据库连接失败: {e}") + raise + + def disconnect(self): + """断开数据库连接""" + if self.connection: + self.connection.close() + logger.info("数据库连接已关闭") + + def get_urls_from_database(self, query: str = None) -> List[str]: + """从数据库获取URL列表""" + if not self.connection: + self.connect() + + # 默认查询语句,根据实际情况修改 + if query is None: + query = "SELECT 1" + + try: + with self.connection.cursor() as cursor: + cursor.execute(query) + results = cursor.fetchall() + + # 处理结果,返回字符串列表 + datas = [list(row.values())[0] for row in results] + return datas + + except Exception as e: + logger.error(f"查询数据库失败: {e}") + return [] + +def process_single_item(api_client, item_text, type, item_index, total_count): + """处理单个项目""" + logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item_text}") + + try: + if type == 'workflow': + # 方法1: 使用带inputs参数的调用 + inputs_data = { + 'companys': f"{item_text}" + } + print(inputs_data) + + result = api_client.call_api_with_inputs(inputs_data) + + if result['success']: + logger.info(f"数据 {item_text} 处理成功") + else: + logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") + return result + + else: + #agent + query = item_text + result = api_client.call_api_with_query(query) + + if result['success']: + logger.info(f"数据 {item_text} 处理成功") + else: + logger.error(f"数据 {item_text} 处理失败: {result.get('error')}") + return result + + except Exception as e: + logger.error(f"处理数据 {item_text} 时发生异常: {e}") + return {"success": False, "error": str(e)} + +#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp +#DATABASE_SCHEMA=p70_ai_intelligence +def main(): + """主函数""" + # 配置并发数 + MAX_WORKERS = 3 # 可调整为5或10 + + # 数据库配置 + db_config = { + 'host': '124.221.232.219', + 'port': '5432', + 'database': 'daas_mpp', + 'user': 'dbuser_dba', + 'password': 'EmBRxnmmjnE3', + 'schema': 'p70_ai_intelligence' + } + + # API配置 + api_config = { + 'url': 'https://tk-agent.idgvalue.com/v1/workflows/run', + 'auth_token': 'Bearer app-pv9rLM4ukTRayP2YD60cS4k5' + } + + api_client = APIClient(api_config['url'], api_config['auth_token']) + + type = 'workflow' + + try: + flag = True + + if flag: + # 初始化 + db_manager = DatabaseManager(db_config) + custom_query = """ +WITH numbered_names AS ( + SELECT + name, + (ROW_NUMBER() OVER (ORDER BY dw_account) - 1) /5 as batch_num + FROM p30_common.v_sql_cleaned_cn_d_account_info + WHERE name NOT IN ( + SELECT "search" + FROM p70_ai_intelligence.agent_account_info + WHERE "search" IS NOT NULL + ) + and name NOT IN ( + SELECT name + FROM p70_ai_intelligence.agent_execp_account + WHERE name IS NOT NULL + ) + order by dw_account + limit 5 +) +SELECT + STRING_AGG(name, E'\n') as batched_names +FROM numbered_names +GROUP BY batch_num +ORDER BY batch_num; + +""" + + try: + # 从数据库获取URL列表 + list = db_manager.get_urls_from_database(custom_query) + + if not list: + logger.warning("未获取到URL") + return + + # 展平所有数据项为单个列表 + # all_items = [] + # for batch in list: + # items = batch.split('\n') + # all_items.extend(items) + + logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程") + + # 使用线程池并发处理 + success_count = 0 + failed_count = 0 + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # 提交所有任务 + future_to_item = { + executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item + for i, item in enumerate(list) + } + + # 收集结果 + for future in as_completed(future_to_item): + item = future_to_item[future] + try: + result = future.result() + if result.get("success"): + success_count += 1 + else: + failed_count += 1 + except Exception as e: + logger.error(f"处理项目 {item} 时发生异常: {e}") + failed_count += 1 + + logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}") + + except Exception as e: + logger.error(f"程序执行失败: {e}") + + finally: + db_manager.disconnect() + + else: + logger.info("调用不带inputs参数的API示例") + if type == 'workflow': + result2 = api_client.call_api_without_inputs() + + if result2['success']: + logger.info("不带inputs参数的API调用成功") + else: + logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") + + else: + #agent + result2 = api_client.call_api_without_query() + + if result2['success']: + logger.info("不带inputs参数的API调用成功") + else: + logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") + + except Exception as e: + logger.error(f"初始化失败: {e}") + + + +if __name__ == "__main__": main() \ No newline at end of file