add workflow 市场-Agents调度,dev

This commit is contained in:
root 2025-11-25 19:26:57 +08:00
parent cfed720db5
commit fdf3dc0363
3 changed files with 300 additions and 8 deletions

View File

@ -24,9 +24,9 @@ default_args = {
}
dag = DAG('wf_dag_agents_market_newsletter', default_args=default_args,
schedule_interval=None,
schedule_interval="0 18 * * 5",
catchup=False,
dagrun_timeout=timedelta(minutes=1440),
dagrun_timeout=timedelta(minutes=160),
max_active_runs=3,
tags=['64cbd1bbced14209b5e3a879f89e8ab1','TK_Cust','市场-Agents调度']
)
@ -39,7 +39,7 @@ task_failed = EmailOperator (
cc=[""],
subject="agents_market_newsletter_failed",
html_content='<h3>您好agents_market_newsletter作业失败请及时处理" </h3>')
agents_recuriments = SSHOperator(
ssh_hook=sshHook,
@ -49,7 +49,7 @@ params={'my_param':"agents_recuriments"},
depends_on_past=False,
retries=1,
dag=dag)
agents_market_summar = SSHOperator(
ssh_hook=sshHook,
@ -59,7 +59,7 @@ params={'my_param':"agents_market_summar"},
depends_on_past=False,
retries=1,
dag=dag)
tk_product_prefer = SSHOperator(
ssh_hook=sshHook,
@ -69,7 +69,29 @@ params={'my_param':"tk_product_prefer"},
depends_on_past=False,
retries=1,
dag=dag)
agents_market_summar >> tk_product_prefer
agents_recuriments >> tk_product_prefer
tk_product_prefer >> task_failed
batch_gen_crawer = SSHOperator(
ssh_hook=sshHook,
task_id='batch_gen_crawer',
command='cd /data/airflow/etl/agents && python batch_gen_crawer.py',
params={'my_param':"batch_gen_crawer"},
depends_on_past=False,
retries=1,
dag=dag)
batch_gen_recruiment = SSHOperator(
ssh_hook=sshHook,
task_id='batch_gen_recruiment',
command='cd /data/airflow/etl/agents && python batch_gen_recruiment.py',
params={'my_param':"batch_gen_recruiment"},
depends_on_past=False,
retries=1,
dag=dag)
agents_market_summar >> tk_product_prefer
agents_recuriments >> tk_product_prefer
agents_market_summar >> batch_gen_crawer
agents_recuriments >> batch_gen_recruiment
batch_gen_recruiment >> task_failed

View File

@ -0,0 +1,135 @@
"""批量从 document_segments 聚合并调用本地 text_import 接口的脚本
用法示例在项目根目录运行
python scripts/batch_import_from_segments.py
说明脚本会把每个 document 的拼接段落full_text作为 text_content按顺序逐条调用接口
配置文件路径默认为 scripts/config.json
"""
import logging
import time
import json
import os
from typing import Optional
import requests
from sqlalchemy import create_engine, text
logger = logging.getLogger("batch_import")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
DEFAULT_SQL = """
SELECT
ds.document_id,
MIN(ds.created_at) AS doc_time,
string_agg(ds.content, '' ORDER BY ds.position) AS full_text
FROM public.document_segments ds
WHERE ds.dataset_id = 'b1b53be2-3407-445f-b7a4-a6e2c717963d'
AND ds.enabled
AND ds.created_at >= NOW() - INTERVAL '24 hours'
GROUP BY ds.document_id
ORDER BY doc_time DESC
"""
def init_db_from_config(database_url: str):
"""根据配置中的数据库URL初始化数据库连接"""
engine = create_engine(
database_url,
pool_size=5,
max_overflow=10,
echo=False,
pool_pre_ping=True
)
return engine
def fetch_documents(engine, dataset_id: str, limit: Optional[int] = None, offset: Optional[int] = None):
sql = DEFAULT_SQL
if limit:
sql = sql + f"\nLIMIT {int(limit)}"
if offset:
sql = sql + f"\nOFFSET {int(offset)}"
with engine.connect() as conn:
result = conn.execute(text(sql), {"dataset_id": dataset_id})
rows = result.fetchall()
return rows
def call_import_api(api_url: str, text_content: str, timeout: int = 2400):
payload = {
"text_content": text_content
}
headers = {"Content-Type": "application/json", "Accept": "application/json"}
resp = requests.post(api_url, json=payload, headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.json()
def load_config(path: str) -> dict:
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"加载 config 文件失败: {e}")
return {}
def main():
# 从配置文件中读取所有参数
dataset_id = 'b1b53be2-3407-445f-b7a4-a6e2c717963d'
api_url = 'http://10.168.1.163:8099/api/tools/import_recruitment'
limit =10000
offset = 0
delay = 0.5
database_url = 'postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/dify'
logger.info("初始化数据库连接")
engine = init_db_from_config(database_url)
logger.info("开始查询并批量调用 text_import 接口")
rows = fetch_documents(engine, dataset_id, limit=limit, offset=offset)
total = len(rows)
logger.info(f"查询到 {total} 条 document 记录(按 document_id 聚合)")
success = 0
failed = 0
for idx, row in enumerate(rows, start=1):
document_id = row[0]
doc_time = row[1]
full_text = row[2] or ""
logger.info(f"[{idx}/{total}] document_id={document_id} doc_time={doc_time}")
try:
res = call_import_api(
api_url=api_url,
text_content=full_text,
timeout=240,
)
logger.info(f" → API 返回 success={res.get('success')} message={res.get('message')}")
if res.get('success'):
success += 1
else:
failed += 1
except Exception as e:
failed += 1
logger.exception(f" ✗ 调用 API 失败: {e}")
time.sleep(delay)
logger.info(f"完成:成功 {success},失败 {failed},总计 {total}")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,135 @@
"""批量从 document_segments 聚合并调用本地 text_import 接口的脚本
用法示例在项目根目录运行
python scripts/batch_import_from_segments.py
说明脚本会把每个 document 的拼接段落full_text作为 text_content按顺序逐条调用接口
配置文件路径默认为 scripts/config.json
"""
import logging
import time
import json
import os
from typing import Optional
import requests
from sqlalchemy import create_engine, text
logger = logging.getLogger("batch_import")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
DEFAULT_SQL = """
SELECT
ds.document_id,
MIN(ds.created_at) AS doc_time,
string_agg(ds.content, '' ORDER BY ds.position) AS full_text
FROM public.document_segments ds
WHERE ds.dataset_id = '8d65631e-0aaf-41b4-be15-7dacb7529dd8'
AND ds.enabled
AND ds.created_at >= NOW() - INTERVAL '24 hours'
GROUP BY ds.document_id
ORDER BY doc_time DESC
"""
def init_db_from_config(database_url: str):
"""根据配置中的数据库URL初始化数据库连接"""
engine = create_engine(
database_url,
pool_size=5,
max_overflow=10,
echo=False,
pool_pre_ping=True
)
return engine
def fetch_documents(engine, dataset_id: str, limit: Optional[int] = None, offset: Optional[int] = None):
sql = DEFAULT_SQL
if limit:
sql = sql + f"\nLIMIT {int(limit)}"
if offset:
sql = sql + f"\nOFFSET {int(offset)}"
with engine.connect() as conn:
result = conn.execute(text(sql), {"dataset_id": dataset_id})
rows = result.fetchall()
return rows
def call_import_api(api_url: str, text_content: str, timeout: int = 2400):
payload = {
"text_content": text_content
}
headers = {"Content-Type": "application/json", "Accept": "application/json"}
resp = requests.post(api_url, json=payload, headers=headers, timeout=timeout)
resp.raise_for_status()
return resp.json()
def load_config(path: str) -> dict:
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"加载 config 文件失败: {e}")
return {}
def main():
# 从配置文件中读取所有参数
dataset_id = '8d65631e-0aaf-41b4-be15-7dacb7529dd8'
api_url = 'http://10.168.1.163:8099/api/tools/import_crawer'
limit =10000
offset = 0
delay = 0.5
database_url = 'postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/dify'
logger.info("初始化数据库连接")
engine = init_db_from_config(database_url)
logger.info("开始查询并批量调用 text_import 接口")
rows = fetch_documents(engine, dataset_id, limit=limit, offset=offset)
total = len(rows)
logger.info(f"查询到 {total} 条 document 记录(按 document_id 聚合)")
success = 0
failed = 0
for idx, row in enumerate(rows, start=1):
document_id = row[0]
doc_time = row[1]
full_text = row[2] or ""
logger.info(f"[{idx}/{total}] document_id={document_id} doc_time={doc_time}")
try:
res = call_import_api(
api_url=api_url,
text_content=full_text,
timeout=240,
)
logger.info(f" → API 返回 success={res.get('success')} message={res.get('message')}")
if res.get('success'):
success += 1
else:
failed += 1
except Exception as e:
failed += 1
logger.exception(f" ✗ 调用 API 失败: {e}")
time.sleep(delay)
logger.info(f"完成:成功 {success},失败 {failed},总计 {total}")
if __name__ == '__main__':
main()