diff --git a/dev/workflow/TK_Cust/agents_market_newsletter/市场-Agents调度/wf_dag_agents_market_newsletter.py b/dev/workflow/TK_Cust/agents_market_newsletter/市场-Agents调度/wf_dag_agents_market_newsletter.py index 1e12789..25f8a43 100644 --- a/dev/workflow/TK_Cust/agents_market_newsletter/市场-Agents调度/wf_dag_agents_market_newsletter.py +++ b/dev/workflow/TK_Cust/agents_market_newsletter/市场-Agents调度/wf_dag_agents_market_newsletter.py @@ -24,9 +24,9 @@ default_args = { } dag = DAG('wf_dag_agents_market_newsletter', default_args=default_args, -schedule_interval=None, +schedule_interval="0 18 * * 5", catchup=False, -dagrun_timeout=timedelta(minutes=1440), +dagrun_timeout=timedelta(minutes=160), max_active_runs=3, tags=['64cbd1bbced14209b5e3a879f89e8ab1','TK_Cust','市场-Agents调度'] ) @@ -39,7 +39,7 @@ task_failed = EmailOperator ( cc=[""], subject="agents_market_newsletter_failed", html_content='

您好,agents_market_newsletter作业失败,请及时处理"

') - + agents_recuriments = SSHOperator( ssh_hook=sshHook, @@ -49,7 +49,7 @@ params={'my_param':"agents_recuriments"}, depends_on_past=False, retries=1, dag=dag) - + agents_market_summar = SSHOperator( ssh_hook=sshHook, @@ -59,7 +59,7 @@ params={'my_param':"agents_market_summar"}, depends_on_past=False, retries=1, dag=dag) - + tk_product_prefer = SSHOperator( ssh_hook=sshHook, @@ -69,7 +69,29 @@ params={'my_param':"tk_product_prefer"}, depends_on_past=False, retries=1, dag=dag) + -agents_market_summar >> tk_product_prefer -agents_recuriments >> tk_product_prefer -tk_product_prefer >> task_failed +batch_gen_crawer = SSHOperator( +ssh_hook=sshHook, +task_id='batch_gen_crawer', +command='cd /data/airflow/etl/agents && python batch_gen_crawer.py', +params={'my_param':"batch_gen_crawer"}, +depends_on_past=False, +retries=1, +dag=dag) + + +batch_gen_recruiment = SSHOperator( +ssh_hook=sshHook, +task_id='batch_gen_recruiment', +command='cd /data/airflow/etl/agents && python batch_gen_recruiment.py', +params={'my_param':"batch_gen_recruiment"}, +depends_on_past=False, +retries=1, +dag=dag) + +agents_market_summar >> tk_product_prefer +agents_recuriments >> tk_product_prefer +agents_market_summar >> batch_gen_crawer +agents_recuriments >> batch_gen_recruiment +batch_gen_recruiment >> task_failed diff --git a/dev/workflow/TK_Cust/agents_market_newsletter/批量处理招聘数据/batch_gen_recruiment.py b/dev/workflow/TK_Cust/agents_market_newsletter/批量处理招聘数据/batch_gen_recruiment.py new file mode 100644 index 0000000..404f922 --- /dev/null +++ b/dev/workflow/TK_Cust/agents_market_newsletter/批量处理招聘数据/batch_gen_recruiment.py @@ -0,0 +1,135 @@ +"""批量从 document_segments 聚合并调用本地 text_import 接口的脚本 + +用法示例(在项目根目录运行): +python scripts/batch_import_from_segments.py + +说明:脚本会把每个 document 的拼接段落(full_text)作为 text_content,按顺序逐条调用接口。 +配置文件路径默认为 scripts/config.json +""" + +import logging +import time +import json +import os +from typing import Optional + +import requests +from sqlalchemy import create_engine, text + +logger = logging.getLogger("batch_import") +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + +DEFAULT_SQL = """ +SELECT + ds.document_id, + MIN(ds.created_at) AS doc_time, + string_agg(ds.content, '' ORDER BY ds.position) AS full_text +FROM public.document_segments ds +WHERE ds.dataset_id = 'b1b53be2-3407-445f-b7a4-a6e2c717963d' + AND ds.enabled + AND ds.created_at >= NOW() - INTERVAL '24 hours' +GROUP BY ds.document_id +ORDER BY doc_time DESC + +""" + + +def init_db_from_config(database_url: str): + """根据配置中的数据库URL初始化数据库连接""" + engine = create_engine( + database_url, + pool_size=5, + max_overflow=10, + echo=False, + pool_pre_ping=True + ) + return engine + + +def fetch_documents(engine, dataset_id: str, limit: Optional[int] = None, offset: Optional[int] = None): + sql = DEFAULT_SQL + if limit: + sql = sql + f"\nLIMIT {int(limit)}" + if offset: + sql = sql + f"\nOFFSET {int(offset)}" + + with engine.connect() as conn: + result = conn.execute(text(sql), {"dataset_id": dataset_id}) + rows = result.fetchall() + return rows + + +def call_import_api(api_url: str, text_content: str, timeout: int = 2400): + payload = { + "text_content": text_content + } + + + headers = {"Content-Type": "application/json", "Accept": "application/json"} + resp = requests.post(api_url, json=payload, headers=headers, timeout=timeout) + resp.raise_for_status() + return resp.json() + + +def load_config(path: str) -> dict: + try: + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + logger.error(f"加载 config 文件失败: {e}") + return {} + + +def main(): + + # 从配置文件中读取所有参数 + dataset_id = 'b1b53be2-3407-445f-b7a4-a6e2c717963d' + api_url = 'http://10.168.1.163:8099/api/tools/import_recruitment' + + limit =10000 + offset = 0 + delay = 0.5 + database_url = 'postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/dify' + + + + logger.info("初始化数据库连接") + engine = init_db_from_config(database_url) + + logger.info("开始查询并批量调用 text_import 接口") + rows = fetch_documents(engine, dataset_id, limit=limit, offset=offset) + total = len(rows) + logger.info(f"查询到 {total} 条 document 记录(按 document_id 聚合)") + + success = 0 + failed = 0 + + for idx, row in enumerate(rows, start=1): + document_id = row[0] + doc_time = row[1] + full_text = row[2] or "" + + logger.info(f"[{idx}/{total}] document_id={document_id} doc_time={doc_time}") + + try: + res = call_import_api( + api_url=api_url, + text_content=full_text, + timeout=240, + ) + logger.info(f" → API 返回 success={res.get('success')} message={res.get('message')}") + if res.get('success'): + success += 1 + else: + failed += 1 + except Exception as e: + failed += 1 + logger.exception(f" ✗ 调用 API 失败: {e}") + + time.sleep(delay) + + logger.info(f"完成:成功 {success},失败 {failed},总计 {total}") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/dev/workflow/TK_Cust/agents_market_newsletter/批量处理爬虫数据/batch_gen_crawer.py b/dev/workflow/TK_Cust/agents_market_newsletter/批量处理爬虫数据/batch_gen_crawer.py new file mode 100644 index 0000000..c854cac --- /dev/null +++ b/dev/workflow/TK_Cust/agents_market_newsletter/批量处理爬虫数据/batch_gen_crawer.py @@ -0,0 +1,135 @@ +"""批量从 document_segments 聚合并调用本地 text_import 接口的脚本 + +用法示例(在项目根目录运行): +python scripts/batch_import_from_segments.py + +说明:脚本会把每个 document 的拼接段落(full_text)作为 text_content,按顺序逐条调用接口。 +配置文件路径默认为 scripts/config.json +""" + +import logging +import time +import json +import os +from typing import Optional + +import requests +from sqlalchemy import create_engine, text + +logger = logging.getLogger("batch_import") +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + +DEFAULT_SQL = """ +SELECT + ds.document_id, + MIN(ds.created_at) AS doc_time, + string_agg(ds.content, '' ORDER BY ds.position) AS full_text +FROM public.document_segments ds +WHERE ds.dataset_id = '8d65631e-0aaf-41b4-be15-7dacb7529dd8' + AND ds.enabled + AND ds.created_at >= NOW() - INTERVAL '24 hours' +GROUP BY ds.document_id +ORDER BY doc_time DESC + +""" + + +def init_db_from_config(database_url: str): + """根据配置中的数据库URL初始化数据库连接""" + engine = create_engine( + database_url, + pool_size=5, + max_overflow=10, + echo=False, + pool_pre_ping=True + ) + return engine + + +def fetch_documents(engine, dataset_id: str, limit: Optional[int] = None, offset: Optional[int] = None): + sql = DEFAULT_SQL + if limit: + sql = sql + f"\nLIMIT {int(limit)}" + if offset: + sql = sql + f"\nOFFSET {int(offset)}" + + with engine.connect() as conn: + result = conn.execute(text(sql), {"dataset_id": dataset_id}) + rows = result.fetchall() + return rows + + +def call_import_api(api_url: str, text_content: str, timeout: int = 2400): + payload = { + "text_content": text_content + } + + + headers = {"Content-Type": "application/json", "Accept": "application/json"} + resp = requests.post(api_url, json=payload, headers=headers, timeout=timeout) + resp.raise_for_status() + return resp.json() + + +def load_config(path: str) -> dict: + try: + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + logger.error(f"加载 config 文件失败: {e}") + return {} + + +def main(): + + # 从配置文件中读取所有参数 + dataset_id = '8d65631e-0aaf-41b4-be15-7dacb7529dd8' + api_url = 'http://10.168.1.163:8099/api/tools/import_crawer' + + limit =10000 + offset = 0 + delay = 0.5 + database_url = 'postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/dify' + + + + logger.info("初始化数据库连接") + engine = init_db_from_config(database_url) + + logger.info("开始查询并批量调用 text_import 接口") + rows = fetch_documents(engine, dataset_id, limit=limit, offset=offset) + total = len(rows) + logger.info(f"查询到 {total} 条 document 记录(按 document_id 聚合)") + + success = 0 + failed = 0 + + for idx, row in enumerate(rows, start=1): + document_id = row[0] + doc_time = row[1] + full_text = row[2] or "" + + logger.info(f"[{idx}/{total}] document_id={document_id} doc_time={doc_time}") + + try: + res = call_import_api( + api_url=api_url, + text_content=full_text, + timeout=240, + ) + logger.info(f" → API 返回 success={res.get('success')} message={res.get('message')}") + if res.get('success'): + success += 1 + else: + failed += 1 + except Exception as e: + failed += 1 + logger.exception(f" ✗ 调用 API 失败: {e}") + + time.sleep(delay) + + logger.info(f"完成:成功 {success},失败 {failed},总计 {total}") + + +if __name__ == '__main__': + main() \ No newline at end of file