add workflow 市场-Agents调度,dev
This commit is contained in:
parent
20c777a13d
commit
9004489719
|
|
@ -1,135 +1,361 @@
|
||||||
"""批量从 document_segments 聚合并调用本地 text_import 接口的脚本
|
|
||||||
|
|
||||||
用法示例(在项目根目录运行):
|
|
||||||
python scripts/batch_import_from_segments.py
|
|
||||||
|
|
||||||
说明:脚本会把每个 document 的拼接段落(full_text)作为 text_content,按顺序逐条调用接口。
|
|
||||||
配置文件路径默认为 scripts/config.json
|
|
||||||
"""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import time
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from sqlalchemy import create_engine, text
|
import json
|
||||||
|
import psycopg2
|
||||||
|
from psycopg2.extras import RealDictCursor
|
||||||
|
import time
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
import logging
|
||||||
|
|
||||||
logger = logging.getLogger("batch_import")
|
# 配置日志
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class APIClient:
|
||||||
|
"""简化的API客户端类"""
|
||||||
|
|
||||||
|
def __init__(self, api_url: str, auth_token: str):
|
||||||
|
self.api_url = api_url
|
||||||
|
self.headers = {
|
||||||
|
'Authorization': auth_token,
|
||||||
|
'Content-Type': 'application/json'
|
||||||
|
}
|
||||||
|
|
||||||
|
def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""直接调用带inputs参数的API
|
||||||
|
|
||||||
|
Args:
|
||||||
|
inputs: inputs参数字典
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
API响应数据
|
||||||
|
"""
|
||||||
|
payload = json.dumps({
|
||||||
|
"inputs": inputs,
|
||||||
|
"response_mode": "blocking",
|
||||||
|
"user": "admin"
|
||||||
|
})
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("调用带inputs参数的API")
|
||||||
|
response = requests.post(
|
||||||
|
self.api_url,
|
||||||
|
headers=self.headers,
|
||||||
|
data=payload,
|
||||||
|
timeout=1200
|
||||||
|
)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"status_code": response.status_code,
|
||||||
|
"data": response.json()
|
||||||
|
}
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"API调用失败: {e}")
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||||
|
"""直接调用不带inputs参数的API
|
||||||
|
|
||||||
|
Args:
|
||||||
|
other_params: 其他参数字典
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
API响应数据
|
||||||
|
"""
|
||||||
|
payload = {
|
||||||
|
"inputs": {},
|
||||||
|
"response_mode": "blocking",
|
||||||
|
"user": "admin"
|
||||||
|
}
|
||||||
|
|
||||||
|
# 添加其他参数
|
||||||
|
if other_params:
|
||||||
|
payload.update(other_params)
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("调用不带inputs参数的API")
|
||||||
|
response = requests.post(
|
||||||
|
self.api_url,
|
||||||
|
headers=self.headers,
|
||||||
|
data=json.dumps(payload),
|
||||||
|
timeout=1200
|
||||||
|
)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"status_code": response.status_code,
|
||||||
|
"data": response.json()
|
||||||
|
}
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"API调用失败: {e}")
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def call_api_with_query(self, query: str = None) -> Dict[str, Any]:
|
||||||
|
"""直接调用带inputs参数的API
|
||||||
|
|
||||||
|
Args:
|
||||||
|
query: 查询语句
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
API响应数据
|
||||||
|
"""
|
||||||
|
payload = json.dumps({
|
||||||
|
"inputs": {},
|
||||||
|
"query" : query,
|
||||||
|
"response_mode": "streaming",
|
||||||
|
"user": "admin"
|
||||||
|
})
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("调用带inputs参数的API")
|
||||||
|
response = requests.post(
|
||||||
|
self.api_url,
|
||||||
|
headers=self.headers,
|
||||||
|
data=payload,
|
||||||
|
timeout=2400
|
||||||
|
)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"status_code": response.status_code,
|
||||||
|
"data": response.json()
|
||||||
|
}
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"API调用失败: {e}")
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||||
|
"""直接调用不带inputs参数的API
|
||||||
|
|
||||||
|
Args:
|
||||||
|
other_params: 其他参数字典
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
API响应数据
|
||||||
|
"""
|
||||||
|
payload = {
|
||||||
|
"inputs": {},
|
||||||
|
"query":"",
|
||||||
|
"response_mode": "streaming",
|
||||||
|
"user": "admin"
|
||||||
|
}
|
||||||
|
|
||||||
|
# 添加其他参数
|
||||||
|
if other_params:
|
||||||
|
payload.update(other_params)
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("调用不带inputs参数的API")
|
||||||
|
response = requests.post(
|
||||||
|
self.api_url,
|
||||||
|
headers=self.headers,
|
||||||
|
data=json.dumps(payload),
|
||||||
|
timeout=2400
|
||||||
|
)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"status_code": response.status_code,
|
||||||
|
"data": response.json()
|
||||||
|
}
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"API调用失败: {e}")
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
class DatabaseManager:
|
||||||
|
"""数据库管理类"""
|
||||||
|
|
||||||
|
def __init__(self, db_config: Dict[str, str]):
|
||||||
|
self.db_config = db_config
|
||||||
|
self.connection = None
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
"""连接数据库"""
|
||||||
|
try:
|
||||||
|
self.connection = psycopg2.connect(
|
||||||
|
host=self.db_config.get('host', 'localhost'),
|
||||||
|
port=self.db_config.get('port', '5432'),
|
||||||
|
database=self.db_config.get('database', 'postgres'),
|
||||||
|
user=self.db_config.get('user', 'postgres'),
|
||||||
|
password=self.db_config.get('password', ''),
|
||||||
|
cursor_factory=RealDictCursor
|
||||||
|
)
|
||||||
|
logger.info("数据库连接成功")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"数据库连接失败: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
"""断开数据库连接"""
|
||||||
|
if self.connection:
|
||||||
|
self.connection.close()
|
||||||
|
logger.info("数据库连接已关闭")
|
||||||
|
|
||||||
|
def get_urls_from_database(self, query: str = None) -> List[str]:
|
||||||
|
"""从数据库获取URL列表"""
|
||||||
|
if not self.connection:
|
||||||
|
self.connect()
|
||||||
|
|
||||||
|
# 默认查询语句,根据实际情况修改
|
||||||
|
if query is None:
|
||||||
|
query = "SELECT 1"
|
||||||
|
|
||||||
|
try:
|
||||||
|
with self.connection.cursor() as cursor:
|
||||||
|
cursor.execute(query)
|
||||||
|
results = cursor.fetchall()
|
||||||
|
|
||||||
|
# 处理结果,返回字符串列表
|
||||||
|
datas = [list(row.values())[0] for row in results]
|
||||||
|
return datas
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"查询数据库失败: {e}")
|
||||||
|
return []
|
||||||
|
#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
|
||||||
|
#DATABASE_SCHEMA=p70_ai_intelligence
|
||||||
|
def main():
|
||||||
|
"""主函数"""
|
||||||
|
# 数据库配置
|
||||||
|
db_config = {
|
||||||
|
'host': '124.221.232.219',
|
||||||
|
'port': '5432',
|
||||||
|
'database': 'dify',
|
||||||
|
'user': 'dbuser_dba',
|
||||||
|
'password': 'EmBRxnmmjnE3',
|
||||||
|
'schema': 'p70_ai_intelligence'
|
||||||
|
}
|
||||||
|
|
||||||
|
# API配置
|
||||||
|
api_config = {
|
||||||
|
'url': 'https://tk-agent.idgvalue.com/v1/workflows/run',
|
||||||
|
'auth_token': 'Bearer app-siNKLRExkzO9IM81P8hHuBx2'
|
||||||
|
}
|
||||||
|
|
||||||
|
api_client = APIClient(api_config['url'], api_config['auth_token'])
|
||||||
|
|
||||||
|
type = 'agent'
|
||||||
|
type = 'workflow'
|
||||||
|
|
||||||
|
try:
|
||||||
|
flag = True
|
||||||
|
|
||||||
|
if flag:
|
||||||
|
# 初始化
|
||||||
|
db_manager = DatabaseManager(db_config)
|
||||||
|
custom_query = """ SELECT
|
||||||
|
|
||||||
DEFAULT_SQL = """
|
|
||||||
SELECT
|
|
||||||
ds.document_id,
|
|
||||||
MIN(ds.created_at) AS doc_time,
|
|
||||||
string_agg(ds.content, '' ORDER BY ds.position) AS full_text
|
string_agg(ds.content, '' ORDER BY ds.position) AS full_text
|
||||||
FROM public.document_segments ds
|
FROM public.document_segments ds
|
||||||
WHERE ds.dataset_id = 'b1b53be2-3407-445f-b7a4-a6e2c717963d'
|
WHERE ds.dataset_id = 'b1b53be2-3407-445f-b7a4-a6e2c717963d'
|
||||||
AND ds.enabled
|
AND ds.enabled
|
||||||
AND ds.created_at >= NOW() - INTERVAL '24 hours'
|
AND ds.created_at >= NOW() - INTERVAL '48 hours'
|
||||||
GROUP BY ds.document_id
|
GROUP BY ds.document_id
|
||||||
ORDER BY doc_time DESC
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 从数据库获取URL列表
|
||||||
|
list = db_manager.get_urls_from_database(custom_query)
|
||||||
|
|
||||||
def init_db_from_config(database_url: str):
|
if not list:
|
||||||
"""根据配置中的数据库URL初始化数据库连接"""
|
logger.warning("未获取到URL")
|
||||||
engine = create_engine(
|
return
|
||||||
database_url,
|
|
||||||
pool_size=5,
|
|
||||||
max_overflow=10,
|
|
||||||
echo=False,
|
|
||||||
pool_pre_ping=True
|
|
||||||
)
|
|
||||||
return engine
|
|
||||||
|
|
||||||
|
# 遍历每个URL调用API
|
||||||
|
for i, text in enumerate(list, 1):
|
||||||
|
logger.info(f"处理第 {i}/{len(list)} 个数据: {text}")
|
||||||
|
|
||||||
def fetch_documents(engine, dataset_id: str, limit: Optional[int] = None, offset: Optional[int] = None):
|
if type == 'workflow':
|
||||||
sql = DEFAULT_SQL
|
# 方法1: 使用带inputs参数的调用
|
||||||
if limit:
|
inputs_data = {
|
||||||
sql = sql + f"\nLIMIT {int(limit)}"
|
"content": f"{text}",
|
||||||
if offset:
|
"type":"招聘信息"
|
||||||
sql = sql + f"\nOFFSET {int(offset)}"
|
|
||||||
|
|
||||||
with engine.connect() as conn:
|
|
||||||
result = conn.execute(text(sql), {"dataset_id": dataset_id})
|
|
||||||
rows = result.fetchall()
|
|
||||||
return rows
|
|
||||||
|
|
||||||
|
|
||||||
def call_import_api(api_url: str, text_content: str, timeout: int = 2400):
|
|
||||||
payload = {
|
|
||||||
"text_content": text_content
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
result = api_client.call_api_with_inputs(inputs_data)
|
||||||
|
|
||||||
headers = {"Content-Type": "application/json", "Accept": "application/json"}
|
if result['success']:
|
||||||
resp = requests.post(api_url, json=payload, headers=headers, timeout=timeout)
|
logger.info(f"URL {text} 处理成功")
|
||||||
resp.raise_for_status()
|
|
||||||
return resp.json()
|
|
||||||
|
|
||||||
|
|
||||||
def load_config(path: str) -> dict:
|
|
||||||
try:
|
|
||||||
with open(path, 'r', encoding='utf-8') as f:
|
|
||||||
return json.load(f)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"加载 config 文件失败: {e}")
|
|
||||||
return {}
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
|
|
||||||
# 从配置文件中读取所有参数
|
|
||||||
dataset_id = 'b1b53be2-3407-445f-b7a4-a6e2c717963d'
|
|
||||||
api_url = 'http://10.168.1.163:8099/api/tools/import_recruitment'
|
|
||||||
|
|
||||||
limit =10000
|
|
||||||
offset = 0
|
|
||||||
delay = 0.5
|
|
||||||
database_url = 'postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/dify'
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
logger.info("初始化数据库连接")
|
|
||||||
engine = init_db_from_config(database_url)
|
|
||||||
|
|
||||||
logger.info("开始查询并批量调用 text_import 接口")
|
|
||||||
rows = fetch_documents(engine, dataset_id, limit=limit, offset=offset)
|
|
||||||
total = len(rows)
|
|
||||||
logger.info(f"查询到 {total} 条 document 记录(按 document_id 聚合)")
|
|
||||||
|
|
||||||
success = 0
|
|
||||||
failed = 0
|
|
||||||
|
|
||||||
for idx, row in enumerate(rows, start=1):
|
|
||||||
document_id = row[0]
|
|
||||||
doc_time = row[1]
|
|
||||||
full_text = row[2] or ""
|
|
||||||
|
|
||||||
logger.info(f"[{idx}/{total}] document_id={document_id} doc_time={doc_time}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
res = call_import_api(
|
|
||||||
api_url=api_url,
|
|
||||||
text_content=full_text,
|
|
||||||
timeout=240,
|
|
||||||
)
|
|
||||||
logger.info(f" → API 返回 success={res.get('success')} message={res.get('message')}")
|
|
||||||
if res.get('success'):
|
|
||||||
success += 1
|
|
||||||
else:
|
else:
|
||||||
failed += 1
|
logger.error(f"URL {text} 处理失败: {result.get('error')}")
|
||||||
|
|
||||||
|
|
||||||
|
else:
|
||||||
|
#agent
|
||||||
|
query = text
|
||||||
|
result = api_client.call_api_with_query(query)
|
||||||
|
|
||||||
|
if result['success']:
|
||||||
|
logger.info(f"URL {text} 处理成功")
|
||||||
|
else:
|
||||||
|
logger.error(f"URL {text} 处理失败: {result.get('error')}")
|
||||||
|
|
||||||
|
# 可选:添加延迟避免请求过于频繁
|
||||||
|
if i < len(text):
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
failed += 1
|
logger.error(f"程序执行失败: {e}")
|
||||||
logger.exception(f" ✗ 调用 API 失败: {e}")
|
|
||||||
|
|
||||||
time.sleep(delay)
|
finally:
|
||||||
|
db_manager.disconnect()
|
||||||
|
|
||||||
logger.info(f"完成:成功 {success},失败 {failed},总计 {total}")
|
else:
|
||||||
|
logger.info("调用不带inputs参数的API示例")
|
||||||
|
if type == 'workflow':
|
||||||
|
result2 = api_client.call_api_without_inputs()
|
||||||
|
|
||||||
|
if result2['success']:
|
||||||
|
logger.info("不带inputs参数的API调用成功")
|
||||||
|
else:
|
||||||
|
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
|
||||||
|
|
||||||
|
else:
|
||||||
|
#agent
|
||||||
|
result2 = api_client.call_api_without_query()
|
||||||
|
|
||||||
|
if result2['success']:
|
||||||
|
logger.info("不带inputs参数的API调用成功")
|
||||||
|
else:
|
||||||
|
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"初始化失败: {e}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|
@ -1,135 +1,361 @@
|
||||||
"""批量从 document_segments 聚合并调用本地 text_import 接口的脚本
|
|
||||||
|
|
||||||
用法示例(在项目根目录运行):
|
|
||||||
python scripts/batch_import_from_segments.py
|
|
||||||
|
|
||||||
说明:脚本会把每个 document 的拼接段落(full_text)作为 text_content,按顺序逐条调用接口。
|
|
||||||
配置文件路径默认为 scripts/config.json
|
|
||||||
"""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import time
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from sqlalchemy import create_engine, text
|
import json
|
||||||
|
import psycopg2
|
||||||
|
from psycopg2.extras import RealDictCursor
|
||||||
|
import time
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
import logging
|
||||||
|
|
||||||
logger = logging.getLogger("batch_import")
|
# 配置日志
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
DEFAULT_SQL = """
|
class APIClient:
|
||||||
SELECT
|
"""简化的API客户端类"""
|
||||||
ds.document_id,
|
|
||||||
MIN(ds.created_at) AS doc_time,
|
def __init__(self, api_url: str, auth_token: str):
|
||||||
|
self.api_url = api_url
|
||||||
|
self.headers = {
|
||||||
|
'Authorization': auth_token,
|
||||||
|
'Content-Type': 'application/json'
|
||||||
|
}
|
||||||
|
|
||||||
|
def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""直接调用带inputs参数的API
|
||||||
|
|
||||||
|
Args:
|
||||||
|
inputs: inputs参数字典
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
API响应数据
|
||||||
|
"""
|
||||||
|
payload = json.dumps({
|
||||||
|
"inputs": inputs,
|
||||||
|
"response_mode": "blocking",
|
||||||
|
"user": "admin"
|
||||||
|
})
|
||||||
|
logger.info(f"调用API,payload: {payload}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("调用带inputs参数的API")
|
||||||
|
response = requests.post(
|
||||||
|
self.api_url,
|
||||||
|
headers=self.headers,
|
||||||
|
data=payload,
|
||||||
|
timeout=2400
|
||||||
|
)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"status_code": response.status_code,
|
||||||
|
"data": response.json()
|
||||||
|
}
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"API调用失败: {e}")
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||||
|
"""直接调用不带inputs参数的API
|
||||||
|
|
||||||
|
Args:
|
||||||
|
other_params: 其他参数字典
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
API响应数据
|
||||||
|
"""
|
||||||
|
payload = {
|
||||||
|
"inputs": {},
|
||||||
|
"response_mode": "blocking",
|
||||||
|
"user": "admin"
|
||||||
|
}
|
||||||
|
|
||||||
|
# 添加其他参数
|
||||||
|
if other_params:
|
||||||
|
payload.update(other_params)
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("调用不带inputs参数的API")
|
||||||
|
response = requests.post(
|
||||||
|
self.api_url,
|
||||||
|
headers=self.headers,
|
||||||
|
data=json.dumps(payload),
|
||||||
|
timeout=1200
|
||||||
|
)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"status_code": response.status_code,
|
||||||
|
"data": response.json()
|
||||||
|
}
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"API调用失败: {e}")
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def call_api_with_query(self, query: str = None) -> Dict[str, Any]:
|
||||||
|
"""直接调用带inputs参数的API
|
||||||
|
|
||||||
|
Args:
|
||||||
|
query: 查询语句
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
API响应数据
|
||||||
|
"""
|
||||||
|
payload = json.dumps({
|
||||||
|
"inputs": {},
|
||||||
|
"query" : query,
|
||||||
|
"response_mode": "streaming",
|
||||||
|
"user": "admin"
|
||||||
|
})
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("调用带inputs参数的API")
|
||||||
|
response = requests.post(
|
||||||
|
self.api_url,
|
||||||
|
headers=self.headers,
|
||||||
|
data=payload,
|
||||||
|
timeout=2400
|
||||||
|
)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"status_code": response.status_code,
|
||||||
|
"data": response.json()
|
||||||
|
}
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"API调用失败: {e}")
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||||
|
"""直接调用不带inputs参数的API
|
||||||
|
|
||||||
|
Args:
|
||||||
|
other_params: 其他参数字典
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
API响应数据
|
||||||
|
"""
|
||||||
|
payload = {
|
||||||
|
"inputs": {},
|
||||||
|
"query":"",
|
||||||
|
"response_mode": "streaming",
|
||||||
|
"user": "admin"
|
||||||
|
}
|
||||||
|
|
||||||
|
# 添加其他参数
|
||||||
|
if other_params:
|
||||||
|
payload.update(other_params)
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("调用不带inputs参数的API")
|
||||||
|
response = requests.post(
|
||||||
|
self.api_url,
|
||||||
|
headers=self.headers,
|
||||||
|
data=json.dumps(payload),
|
||||||
|
timeout=2400
|
||||||
|
)
|
||||||
|
|
||||||
|
response.raise_for_status()
|
||||||
|
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"status_code": response.status_code,
|
||||||
|
"data": response.json()
|
||||||
|
}
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"API调用失败: {e}")
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
class DatabaseManager:
|
||||||
|
"""数据库管理类"""
|
||||||
|
|
||||||
|
def __init__(self, db_config: Dict[str, str]):
|
||||||
|
self.db_config = db_config
|
||||||
|
self.connection = None
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
"""连接数据库"""
|
||||||
|
try:
|
||||||
|
self.connection = psycopg2.connect(
|
||||||
|
host=self.db_config.get('host', 'localhost'),
|
||||||
|
port=self.db_config.get('port', '5432'),
|
||||||
|
database=self.db_config.get('database', 'postgres'),
|
||||||
|
user=self.db_config.get('user', 'postgres'),
|
||||||
|
password=self.db_config.get('password', ''),
|
||||||
|
cursor_factory=RealDictCursor
|
||||||
|
)
|
||||||
|
logger.info("数据库连接成功")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"数据库连接失败: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
"""断开数据库连接"""
|
||||||
|
if self.connection:
|
||||||
|
self.connection.close()
|
||||||
|
logger.info("数据库连接已关闭")
|
||||||
|
|
||||||
|
def get_urls_from_database(self, query: str = None) -> List[str]:
|
||||||
|
"""从数据库获取URL列表"""
|
||||||
|
if not self.connection:
|
||||||
|
self.connect()
|
||||||
|
|
||||||
|
# 默认查询语句,根据实际情况修改
|
||||||
|
if query is None:
|
||||||
|
query = "SELECT 1"
|
||||||
|
|
||||||
|
try:
|
||||||
|
with self.connection.cursor() as cursor:
|
||||||
|
cursor.execute(query)
|
||||||
|
results = cursor.fetchall()
|
||||||
|
|
||||||
|
# 处理结果,返回字符串列表
|
||||||
|
datas = [list(row.values())[0] for row in results]
|
||||||
|
return datas
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"查询数据库失败: {e}")
|
||||||
|
return []
|
||||||
|
#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
|
||||||
|
#DATABASE_SCHEMA=p70_ai_intelligence
|
||||||
|
def main():
|
||||||
|
"""主函数"""
|
||||||
|
# 数据库配置
|
||||||
|
db_config = {
|
||||||
|
'host': '124.221.232.219',
|
||||||
|
'port': '5432',
|
||||||
|
'database': 'dify',
|
||||||
|
'user': 'dbuser_dba',
|
||||||
|
'password': 'EmBRxnmmjnE3',
|
||||||
|
'schema': 'p70_ai_intelligence'
|
||||||
|
}
|
||||||
|
|
||||||
|
# API配置
|
||||||
|
api_config = {
|
||||||
|
'url': 'https://tk-agent.idgvalue.com/v1/workflows/run',
|
||||||
|
'auth_token': 'Bearer app-siNKLRExkzO9IM81P8hHuBx2'
|
||||||
|
}
|
||||||
|
|
||||||
|
api_client = APIClient(api_config['url'], api_config['auth_token'])
|
||||||
|
|
||||||
|
type = 'agent'
|
||||||
|
type = 'workflow'
|
||||||
|
|
||||||
|
try:
|
||||||
|
flag = True
|
||||||
|
|
||||||
|
if flag:
|
||||||
|
# 初始化
|
||||||
|
db_manager = DatabaseManager(db_config)
|
||||||
|
custom_query = """SELECT
|
||||||
string_agg(ds.content, '' ORDER BY ds.position) AS full_text
|
string_agg(ds.content, '' ORDER BY ds.position) AS full_text
|
||||||
FROM public.document_segments ds
|
FROM public.document_segments ds
|
||||||
WHERE ds.dataset_id = '8d65631e-0aaf-41b4-be15-7dacb7529dd8'
|
WHERE ds.dataset_id = '8d65631e-0aaf-41b4-be15-7dacb7529dd8'
|
||||||
AND ds.enabled
|
AND ds.enabled
|
||||||
AND ds.created_at >= NOW() - INTERVAL '24 hours'
|
AND ds.created_at >= NOW() - INTERVAL '48 hours'
|
||||||
GROUP BY ds.document_id
|
GROUP BY ds.document_id
|
||||||
ORDER BY doc_time DESC
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 从数据库获取URL列表
|
||||||
|
list = db_manager.get_urls_from_database(custom_query)
|
||||||
|
|
||||||
def init_db_from_config(database_url: str):
|
if not list:
|
||||||
"""根据配置中的数据库URL初始化数据库连接"""
|
logger.warning("未获取到URL")
|
||||||
engine = create_engine(
|
return
|
||||||
database_url,
|
|
||||||
pool_size=5,
|
|
||||||
max_overflow=10,
|
|
||||||
echo=False,
|
|
||||||
pool_pre_ping=True
|
|
||||||
)
|
|
||||||
return engine
|
|
||||||
|
|
||||||
|
# 遍历每个URL调用API
|
||||||
|
for i, text in enumerate(list, 3):
|
||||||
|
logger.info(f"处理第 {i}/{len(list)} 个数据: {text}")
|
||||||
|
|
||||||
def fetch_documents(engine, dataset_id: str, limit: Optional[int] = None, offset: Optional[int] = None):
|
if type == 'workflow':
|
||||||
sql = DEFAULT_SQL
|
# 方法1: 使用带inputs参数的调用
|
||||||
if limit:
|
inputs_data = {
|
||||||
sql = sql + f"\nLIMIT {int(limit)}"
|
'content': f"{text}",
|
||||||
if offset:
|
'type':"爬虫信息"
|
||||||
sql = sql + f"\nOFFSET {int(offset)}"
|
|
||||||
|
|
||||||
with engine.connect() as conn:
|
|
||||||
result = conn.execute(text(sql), {"dataset_id": dataset_id})
|
|
||||||
rows = result.fetchall()
|
|
||||||
return rows
|
|
||||||
|
|
||||||
|
|
||||||
def call_import_api(api_url: str, text_content: str, timeout: int = 2400):
|
|
||||||
payload = {
|
|
||||||
"text_content": text_content
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
result = api_client.call_api_with_inputs(inputs_data)
|
||||||
|
|
||||||
headers = {"Content-Type": "application/json", "Accept": "application/json"}
|
if result['success']:
|
||||||
resp = requests.post(api_url, json=payload, headers=headers, timeout=timeout)
|
logger.info(f"URL {text} 处理成功")
|
||||||
resp.raise_for_status()
|
|
||||||
return resp.json()
|
|
||||||
|
|
||||||
|
|
||||||
def load_config(path: str) -> dict:
|
|
||||||
try:
|
|
||||||
with open(path, 'r', encoding='utf-8') as f:
|
|
||||||
return json.load(f)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"加载 config 文件失败: {e}")
|
|
||||||
return {}
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
|
|
||||||
# 从配置文件中读取所有参数
|
|
||||||
dataset_id = '8d65631e-0aaf-41b4-be15-7dacb7529dd8'
|
|
||||||
api_url = 'http://10.168.1.163:8099/api/tools/import_crawer'
|
|
||||||
|
|
||||||
limit =10000
|
|
||||||
offset = 0
|
|
||||||
delay = 0.5
|
|
||||||
database_url = 'postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/dify'
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
logger.info("初始化数据库连接")
|
|
||||||
engine = init_db_from_config(database_url)
|
|
||||||
|
|
||||||
logger.info("开始查询并批量调用 text_import 接口")
|
|
||||||
rows = fetch_documents(engine, dataset_id, limit=limit, offset=offset)
|
|
||||||
total = len(rows)
|
|
||||||
logger.info(f"查询到 {total} 条 document 记录(按 document_id 聚合)")
|
|
||||||
|
|
||||||
success = 0
|
|
||||||
failed = 0
|
|
||||||
|
|
||||||
for idx, row in enumerate(rows, start=1):
|
|
||||||
document_id = row[0]
|
|
||||||
doc_time = row[1]
|
|
||||||
full_text = row[2] or ""
|
|
||||||
|
|
||||||
logger.info(f"[{idx}/{total}] document_id={document_id} doc_time={doc_time}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
res = call_import_api(
|
|
||||||
api_url=api_url,
|
|
||||||
text_content=full_text,
|
|
||||||
timeout=240,
|
|
||||||
)
|
|
||||||
logger.info(f" → API 返回 success={res.get('success')} message={res.get('message')}")
|
|
||||||
if res.get('success'):
|
|
||||||
success += 1
|
|
||||||
else:
|
else:
|
||||||
failed += 1
|
logger.error(f"URL {text} 处理失败: {result.get('error')}")
|
||||||
|
|
||||||
|
|
||||||
|
else:
|
||||||
|
#agent
|
||||||
|
query = text
|
||||||
|
result = api_client.call_api_with_query(query)
|
||||||
|
|
||||||
|
if result['success']:
|
||||||
|
logger.info(f"URL {text} 处理成功")
|
||||||
|
else:
|
||||||
|
logger.error(f"URL {text} 处理失败: {result.get('error')}")
|
||||||
|
|
||||||
|
# 可选:添加延迟避免请求过于频繁
|
||||||
|
if i < len(text):
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
failed += 1
|
logger.error(f"程序执行失败: {e}")
|
||||||
logger.exception(f" ✗ 调用 API 失败: {e}")
|
|
||||||
|
|
||||||
time.sleep(delay)
|
finally:
|
||||||
|
db_manager.disconnect()
|
||||||
|
|
||||||
logger.info(f"完成:成功 {success},失败 {failed},总计 {total}")
|
else:
|
||||||
|
logger.info("调用不带inputs参数的API示例")
|
||||||
|
if type == 'workflow':
|
||||||
|
result2 = api_client.call_api_without_inputs()
|
||||||
|
|
||||||
|
if result2['success']:
|
||||||
|
logger.info("不带inputs参数的API调用成功")
|
||||||
|
else:
|
||||||
|
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
|
||||||
|
|
||||||
|
else:
|
||||||
|
#agent
|
||||||
|
result2 = api_client.call_api_without_query()
|
||||||
|
|
||||||
|
if result2['success']:
|
||||||
|
logger.info("不带inputs参数的API调用成功")
|
||||||
|
else:
|
||||||
|
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"初始化失败: {e}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
Loading…
Reference in New Issue