diff --git a/dev/workflow/TK_Cust/agents_market_newsletter/批量处理招聘数据/batch_gen_recruiment.py b/dev/workflow/TK_Cust/agents_market_newsletter/批量处理招聘数据/batch_gen_recruiment.py index 404f922..2c00f4e 100644 --- a/dev/workflow/TK_Cust/agents_market_newsletter/批量处理招聘数据/batch_gen_recruiment.py +++ b/dev/workflow/TK_Cust/agents_market_newsletter/批量处理招聘数据/batch_gen_recruiment.py @@ -1,135 +1,361 @@ -"""批量从 document_segments 聚合并调用本地 text_import 接口的脚本 - -用法示例(在项目根目录运行): -python scripts/batch_import_from_segments.py - -说明:脚本会把每个 document 的拼接段落(full_text)作为 text_content,按顺序逐条调用接口。 -配置文件路径默认为 scripts/config.json -""" - -import logging -import time -import json -import os -from typing import Optional - import requests -from sqlalchemy import create_engine, text +import json +import psycopg2 +from psycopg2.extras import RealDictCursor +import time +from typing import List, Dict, Any +import logging -logger = logging.getLogger("batch_import") -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) -DEFAULT_SQL = """ -SELECT - ds.document_id, - MIN(ds.created_at) AS doc_time, +class APIClient: + """简化的API客户端类""" + + def __init__(self, api_url: str, auth_token: str): + self.api_url = api_url + self.headers = { + 'Authorization': auth_token, + 'Content-Type': 'application/json' + } + + def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]: + """直接调用带inputs参数的API + + Args: + inputs: inputs参数字典 + + Returns: + API响应数据 + """ + payload = json.dumps({ + "inputs": inputs, + "response_mode": "blocking", + "user": "admin" + }) + + try: + logger.info("调用带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=payload, + timeout=1200 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: + """直接调用不带inputs参数的API + + Args: + other_params: 其他参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "response_mode": "blocking", + "user": "admin" + } + + # 添加其他参数 + if other_params: + payload.update(other_params) + + try: + logger.info("调用不带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=json.dumps(payload), + timeout=1200 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + + def call_api_with_query(self, query: str = None) -> Dict[str, Any]: + """直接调用带inputs参数的API + + Args: + query: 查询语句 + + Returns: + API响应数据 + """ + payload = json.dumps({ + "inputs": {}, + "query" : query, + "response_mode": "streaming", + "user": "admin" + }) + + try: + logger.info("调用带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=payload, + timeout=2400 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: + """直接调用不带inputs参数的API + + Args: + other_params: 其他参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "query":"", + "response_mode": "streaming", + "user": "admin" + } + + # 添加其他参数 + if other_params: + payload.update(other_params) + + try: + logger.info("调用不带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=json.dumps(payload), + timeout=2400 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + +class DatabaseManager: + """数据库管理类""" + + def __init__(self, db_config: Dict[str, str]): + self.db_config = db_config + self.connection = None + + def connect(self): + """连接数据库""" + try: + self.connection = psycopg2.connect( + host=self.db_config.get('host', 'localhost'), + port=self.db_config.get('port', '5432'), + database=self.db_config.get('database', 'postgres'), + user=self.db_config.get('user', 'postgres'), + password=self.db_config.get('password', ''), + cursor_factory=RealDictCursor + ) + logger.info("数据库连接成功") + except Exception as e: + logger.error(f"数据库连接失败: {e}") + raise + + def disconnect(self): + """断开数据库连接""" + if self.connection: + self.connection.close() + logger.info("数据库连接已关闭") + + def get_urls_from_database(self, query: str = None) -> List[str]: + """从数据库获取URL列表""" + if not self.connection: + self.connect() + + # 默认查询语句,根据实际情况修改 + if query is None: + query = "SELECT 1" + + try: + with self.connection.cursor() as cursor: + cursor.execute(query) + results = cursor.fetchall() + + # 处理结果,返回字符串列表 + datas = [list(row.values())[0] for row in results] + return datas + + except Exception as e: + logger.error(f"查询数据库失败: {e}") + return [] +#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp +#DATABASE_SCHEMA=p70_ai_intelligence +def main(): + """主函数""" + # 数据库配置 + db_config = { + 'host': '124.221.232.219', + 'port': '5432', + 'database': 'dify', + 'user': 'dbuser_dba', + 'password': 'EmBRxnmmjnE3', + 'schema': 'p70_ai_intelligence' + } + + # API配置 + api_config = { + 'url': 'https://tk-agent.idgvalue.com/v1/workflows/run', + 'auth_token': 'Bearer app-siNKLRExkzO9IM81P8hHuBx2' + } + + api_client = APIClient(api_config['url'], api_config['auth_token']) + + type = 'agent' + type = 'workflow' + + try: + flag = True + + if flag: + # 初始化 + db_manager = DatabaseManager(db_config) + custom_query = """ SELECT + string_agg(ds.content, '' ORDER BY ds.position) AS full_text FROM public.document_segments ds WHERE ds.dataset_id = 'b1b53be2-3407-445f-b7a4-a6e2c717963d' AND ds.enabled - AND ds.created_at >= NOW() - INTERVAL '24 hours' + AND ds.created_at >= NOW() - INTERVAL '48 hours' GROUP BY ds.document_id -ORDER BY doc_time DESC -""" + """ + + try: + # 从数据库获取URL列表 + list = db_manager.get_urls_from_database(custom_query) + + if not list: + logger.warning("未获取到URL") + return + + # 遍历每个URL调用API + for i, text in enumerate(list, 1): + logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") + + if type == 'workflow': + # 方法1: 使用带inputs参数的调用 + inputs_data = { + "content": f"{text}", + "type":"招聘信息" + } + + result = api_client.call_api_with_inputs(inputs_data) + + if result['success']: + logger.info(f"URL {text} 处理成功") + else: + logger.error(f"URL {text} 处理失败: {result.get('error')}") + + + else: + #agent + query = text + result = api_client.call_api_with_query(query) + + if result['success']: + logger.info(f"URL {text} 处理成功") + else: + logger.error(f"URL {text} 处理失败: {result.get('error')}") + + # 可选:添加延迟避免请求过于频繁 + if i < len(text): + time.sleep(1) + + except Exception as e: + logger.error(f"程序执行失败: {e}") + + finally: + db_manager.disconnect() - -def init_db_from_config(database_url: str): - """根据配置中的数据库URL初始化数据库连接""" - engine = create_engine( - database_url, - pool_size=5, - max_overflow=10, - echo=False, - pool_pre_ping=True - ) - return engine - - -def fetch_documents(engine, dataset_id: str, limit: Optional[int] = None, offset: Optional[int] = None): - sql = DEFAULT_SQL - if limit: - sql = sql + f"\nLIMIT {int(limit)}" - if offset: - sql = sql + f"\nOFFSET {int(offset)}" - - with engine.connect() as conn: - result = conn.execute(text(sql), {"dataset_id": dataset_id}) - rows = result.fetchall() - return rows - - -def call_import_api(api_url: str, text_content: str, timeout: int = 2400): - payload = { - "text_content": text_content - } - - - headers = {"Content-Type": "application/json", "Accept": "application/json"} - resp = requests.post(api_url, json=payload, headers=headers, timeout=timeout) - resp.raise_for_status() - return resp.json() - - -def load_config(path: str) -> dict: - try: - with open(path, 'r', encoding='utf-8') as f: - return json.load(f) - except Exception as e: - logger.error(f"加载 config 文件失败: {e}") - return {} - - -def main(): - - # 从配置文件中读取所有参数 - dataset_id = 'b1b53be2-3407-445f-b7a4-a6e2c717963d' - api_url = 'http://10.168.1.163:8099/api/tools/import_recruitment' - - limit =10000 - offset = 0 - delay = 0.5 - database_url = 'postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/dify' - - - - logger.info("初始化数据库连接") - engine = init_db_from_config(database_url) - - logger.info("开始查询并批量调用 text_import 接口") - rows = fetch_documents(engine, dataset_id, limit=limit, offset=offset) - total = len(rows) - logger.info(f"查询到 {total} 条 document 记录(按 document_id 聚合)") - - success = 0 - failed = 0 - - for idx, row in enumerate(rows, start=1): - document_id = row[0] - doc_time = row[1] - full_text = row[2] or "" - - logger.info(f"[{idx}/{total}] document_id={document_id} doc_time={doc_time}") - - try: - res = call_import_api( - api_url=api_url, - text_content=full_text, - timeout=240, - ) - logger.info(f" → API 返回 success={res.get('success')} message={res.get('message')}") - if res.get('success'): - success += 1 + else: + logger.info("调用不带inputs参数的API示例") + if type == 'workflow': + result2 = api_client.call_api_without_inputs() + + if result2['success']: + logger.info("不带inputs参数的API调用成功") + else: + logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") + else: - failed += 1 - except Exception as e: - failed += 1 - logger.exception(f" ✗ 调用 API 失败: {e}") + #agent + result2 = api_client.call_api_without_query() + + if result2['success']: + logger.info("不带inputs参数的API调用成功") + else: + logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") - time.sleep(delay) + except Exception as e: + logger.error(f"初始化失败: {e}") + + - logger.info(f"完成:成功 {success},失败 {failed},总计 {total}") - - -if __name__ == '__main__': +if __name__ == "__main__": main() \ No newline at end of file diff --git a/dev/workflow/TK_Cust/agents_market_newsletter/批量处理爬虫数据/batch_gen_crawer.py b/dev/workflow/TK_Cust/agents_market_newsletter/批量处理爬虫数据/batch_gen_crawer.py index c854cac..fde0710 100644 --- a/dev/workflow/TK_Cust/agents_market_newsletter/批量处理爬虫数据/batch_gen_crawer.py +++ b/dev/workflow/TK_Cust/agents_market_newsletter/批量处理爬虫数据/batch_gen_crawer.py @@ -1,135 +1,361 @@ -"""批量从 document_segments 聚合并调用本地 text_import 接口的脚本 - -用法示例(在项目根目录运行): -python scripts/batch_import_from_segments.py - -说明:脚本会把每个 document 的拼接段落(full_text)作为 text_content,按顺序逐条调用接口。 -配置文件路径默认为 scripts/config.json -""" - -import logging -import time -import json -import os -from typing import Optional - import requests -from sqlalchemy import create_engine, text +import json +import psycopg2 +from psycopg2.extras import RealDictCursor +import time +from typing import List, Dict, Any +import logging -logger = logging.getLogger("batch_import") -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) -DEFAULT_SQL = """ -SELECT - ds.document_id, - MIN(ds.created_at) AS doc_time, +class APIClient: + """简化的API客户端类""" + + def __init__(self, api_url: str, auth_token: str): + self.api_url = api_url + self.headers = { + 'Authorization': auth_token, + 'Content-Type': 'application/json' + } + + def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]: + """直接调用带inputs参数的API + + Args: + inputs: inputs参数字典 + + Returns: + API响应数据 + """ + payload = json.dumps({ + "inputs": inputs, + "response_mode": "blocking", + "user": "admin" + }) + logger.info(f"调用API,payload: {payload}") + + try: + logger.info("调用带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=payload, + timeout=2400 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + + def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: + """直接调用不带inputs参数的API + + Args: + other_params: 其他参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "response_mode": "blocking", + "user": "admin" + } + + # 添加其他参数 + if other_params: + payload.update(other_params) + + try: + logger.info("调用不带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=json.dumps(payload), + timeout=1200 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + + def call_api_with_query(self, query: str = None) -> Dict[str, Any]: + """直接调用带inputs参数的API + + Args: + query: 查询语句 + + Returns: + API响应数据 + """ + payload = json.dumps({ + "inputs": {}, + "query" : query, + "response_mode": "streaming", + "user": "admin" + }) + + try: + logger.info("调用带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=payload, + timeout=2400 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: + """直接调用不带inputs参数的API + + Args: + other_params: 其他参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "query":"", + "response_mode": "streaming", + "user": "admin" + } + + # 添加其他参数 + if other_params: + payload.update(other_params) + + try: + logger.info("调用不带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=json.dumps(payload), + timeout=2400 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + +class DatabaseManager: + """数据库管理类""" + + def __init__(self, db_config: Dict[str, str]): + self.db_config = db_config + self.connection = None + + def connect(self): + """连接数据库""" + try: + self.connection = psycopg2.connect( + host=self.db_config.get('host', 'localhost'), + port=self.db_config.get('port', '5432'), + database=self.db_config.get('database', 'postgres'), + user=self.db_config.get('user', 'postgres'), + password=self.db_config.get('password', ''), + cursor_factory=RealDictCursor + ) + logger.info("数据库连接成功") + except Exception as e: + logger.error(f"数据库连接失败: {e}") + raise + + def disconnect(self): + """断开数据库连接""" + if self.connection: + self.connection.close() + logger.info("数据库连接已关闭") + + def get_urls_from_database(self, query: str = None) -> List[str]: + """从数据库获取URL列表""" + if not self.connection: + self.connect() + + # 默认查询语句,根据实际情况修改 + if query is None: + query = "SELECT 1" + + try: + with self.connection.cursor() as cursor: + cursor.execute(query) + results = cursor.fetchall() + + # 处理结果,返回字符串列表 + datas = [list(row.values())[0] for row in results] + return datas + + except Exception as e: + logger.error(f"查询数据库失败: {e}") + return [] +#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp +#DATABASE_SCHEMA=p70_ai_intelligence +def main(): + """主函数""" + # 数据库配置 + db_config = { + 'host': '124.221.232.219', + 'port': '5432', + 'database': 'dify', + 'user': 'dbuser_dba', + 'password': 'EmBRxnmmjnE3', + 'schema': 'p70_ai_intelligence' + } + + # API配置 + api_config = { + 'url': 'https://tk-agent.idgvalue.com/v1/workflows/run', + 'auth_token': 'Bearer app-siNKLRExkzO9IM81P8hHuBx2' + } + + api_client = APIClient(api_config['url'], api_config['auth_token']) + + type = 'agent' + type = 'workflow' + + try: + flag = True + + if flag: + # 初始化 + db_manager = DatabaseManager(db_config) + custom_query = """SELECT string_agg(ds.content, '' ORDER BY ds.position) AS full_text FROM public.document_segments ds WHERE ds.dataset_id = '8d65631e-0aaf-41b4-be15-7dacb7529dd8' AND ds.enabled - AND ds.created_at >= NOW() - INTERVAL '24 hours' -GROUP BY ds.document_id -ORDER BY doc_time DESC - + AND ds.created_at >= NOW() - INTERVAL '48 hours' +GROUP BY ds.document_id """ + + try: + # 从数据库获取URL列表 + list = db_manager.get_urls_from_database(custom_query) + + if not list: + logger.warning("未获取到URL") + return + + # 遍历每个URL调用API + for i, text in enumerate(list, 3): + logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") + + if type == 'workflow': + # 方法1: 使用带inputs参数的调用 + inputs_data = { + 'content': f"{text}", + 'type':"爬虫信息" + } + + result = api_client.call_api_with_inputs(inputs_data) + + if result['success']: + logger.info(f"URL {text} 处理成功") + else: + logger.error(f"URL {text} 处理失败: {result.get('error')}") + + + else: + #agent + query = text + result = api_client.call_api_with_query(query) + + if result['success']: + logger.info(f"URL {text} 处理成功") + else: + logger.error(f"URL {text} 处理失败: {result.get('error')}") + + # 可选:添加延迟避免请求过于频繁 + if i < len(text): + time.sleep(1) + + except Exception as e: + logger.error(f"程序执行失败: {e}") + + finally: + db_manager.disconnect() - -def init_db_from_config(database_url: str): - """根据配置中的数据库URL初始化数据库连接""" - engine = create_engine( - database_url, - pool_size=5, - max_overflow=10, - echo=False, - pool_pre_ping=True - ) - return engine - - -def fetch_documents(engine, dataset_id: str, limit: Optional[int] = None, offset: Optional[int] = None): - sql = DEFAULT_SQL - if limit: - sql = sql + f"\nLIMIT {int(limit)}" - if offset: - sql = sql + f"\nOFFSET {int(offset)}" - - with engine.connect() as conn: - result = conn.execute(text(sql), {"dataset_id": dataset_id}) - rows = result.fetchall() - return rows - - -def call_import_api(api_url: str, text_content: str, timeout: int = 2400): - payload = { - "text_content": text_content - } - - - headers = {"Content-Type": "application/json", "Accept": "application/json"} - resp = requests.post(api_url, json=payload, headers=headers, timeout=timeout) - resp.raise_for_status() - return resp.json() - - -def load_config(path: str) -> dict: - try: - with open(path, 'r', encoding='utf-8') as f: - return json.load(f) - except Exception as e: - logger.error(f"加载 config 文件失败: {e}") - return {} - - -def main(): - - # 从配置文件中读取所有参数 - dataset_id = '8d65631e-0aaf-41b4-be15-7dacb7529dd8' - api_url = 'http://10.168.1.163:8099/api/tools/import_crawer' - - limit =10000 - offset = 0 - delay = 0.5 - database_url = 'postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/dify' - - - - logger.info("初始化数据库连接") - engine = init_db_from_config(database_url) - - logger.info("开始查询并批量调用 text_import 接口") - rows = fetch_documents(engine, dataset_id, limit=limit, offset=offset) - total = len(rows) - logger.info(f"查询到 {total} 条 document 记录(按 document_id 聚合)") - - success = 0 - failed = 0 - - for idx, row in enumerate(rows, start=1): - document_id = row[0] - doc_time = row[1] - full_text = row[2] or "" - - logger.info(f"[{idx}/{total}] document_id={document_id} doc_time={doc_time}") - - try: - res = call_import_api( - api_url=api_url, - text_content=full_text, - timeout=240, - ) - logger.info(f" → API 返回 success={res.get('success')} message={res.get('message')}") - if res.get('success'): - success += 1 + else: + logger.info("调用不带inputs参数的API示例") + if type == 'workflow': + result2 = api_client.call_api_without_inputs() + + if result2['success']: + logger.info("不带inputs参数的API调用成功") + else: + logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") + else: - failed += 1 - except Exception as e: - failed += 1 - logger.exception(f" ✗ 调用 API 失败: {e}") + #agent + result2 = api_client.call_api_without_query() + + if result2['success']: + logger.info("不带inputs参数的API调用成功") + else: + logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") - time.sleep(delay) + except Exception as e: + logger.error(f"初始化失败: {e}") + + - logger.info(f"完成:成功 {success},失败 {failed},总计 {total}") - - -if __name__ == '__main__': +if __name__ == "__main__": main() \ No newline at end of file