add workflow 市场-Agents调度,dev

This commit is contained in:
root 2025-11-20 11:44:35 +08:00
parent b3af3b1f97
commit 3fb8035795
1 changed files with 24 additions and 145 deletions

View File

@ -23,98 +23,12 @@ class APIClient:
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]: def call_api_with_inputs(self, inputs: Dict[str, Any] = None, query: str = None) -> Dict[str, Any]:
"""直接调用带inputs参数的API """直接调用带inputs参数的API
Args: Args:
inputs: inputs参数字典 inputs: inputs参数字典
Returns:
API响应数据
"""
payload = json.dumps({
"inputs": inputs,
"response_mode": "blocking",
"user": "admin"
})
try:
logger.info("调用带inputs参数的API")
response = requests.post(
self.api_url,
headers=self.headers,
data=payload,
timeout=1200
)
response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}")
return {
"success": True,
"status_code": response.status_code,
"data": response.json()
}
except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}")
return {
"success": False,
"error": str(e)
}
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
"""直接调用不带inputs参数的API
Args:
other_params: 其他参数字典
Returns:
API响应数据
"""
inputs_data = {}
payload = {
"inputs": inputs_data,
"response_mode": "blocking",
"user": "admin"
}
# 添加其他参数
if other_params:
payload.update(other_params)
try:
logger.info("调用不带inputs参数的API")
response = requests.post(
self.api_url,
headers=self.headers,
data=json.dumps(payload),
timeout=1200
)
response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}")
return {
"success": True,
"status_code": response.status_code,
"data": response.json()
}
except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}")
return {
"success": False,
"error": str(e)
}
def call_api_with_query(self, inputs: Dict[str, Any] = None, query: str = None) -> Dict[str, Any]:
"""直接调用带inputs参数的API
Args:
query: 查询语句
Returns: Returns:
API响应数据 API响应数据
""" """
@ -150,7 +64,7 @@ class APIClient:
"error": str(e) "error": str(e)
} }
def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: def call_api_without_inputs(self, other_params: Dict[str, Any] = None, query:str = None) -> Dict[str, Any]:
"""直接调用不带inputs参数的API """直接调用不带inputs参数的API
Args: Args:
@ -160,11 +74,10 @@ class APIClient:
API响应数据 API响应数据
""" """
payload = { payload = {
"inputs": None, "inputs": {},
"query": "", "query": "",
"response_mode": "streaming", "response_mode": "blocking",
"user": "admin", "user": "admin"
"conversation_id":""
} }
# 添加其他参数 # 添加其他参数
@ -177,7 +90,7 @@ class APIClient:
self.api_url, self.api_url,
headers=self.headers, headers=self.headers,
data=json.dumps(payload), data=json.dumps(payload),
timeout=2400 timeout=1200
) )
response.raise_for_status() response.raise_for_status()
@ -262,22 +175,19 @@ def main():
# API配置 # API配置
api_config = { api_config = {
'url': 'http://agent.idgvalue.com/v1/chat-messages', 'url': 'https://agent.idgvalue.com/v1/chat-messages',
'auth_token': 'Bearer app-yAPAzkPbAaV8l1SuZBjXHTU4' 'auth_token': 'Bearer app-yAPAzkPbAaV8l1SuZBjXHTU4'
} }
api_client = APIClient(api_config['url'], api_config['auth_token']) api_client = APIClient(api_config['url'], api_config['auth_token'])
type = 'agent'
type = 'agent'
try: try:
flag = True flag = True
if flag: if flag:
# 初始化 # 初始化
db_manager = DatabaseManager(db_config) db_manager = DatabaseManager(db_config)
custom_query = """ select "客户名称" from p99_temp.tk_product_prefer_list limit 1""" custom_query = """ select "客户名称" from p99_temp.tk_product_prefer_list """
try: try:
# 从数据库获取URL列表 # 从数据库获取URL列表
@ -291,26 +201,11 @@ def main():
for i, text in enumerate(list, 1): for i, text in enumerate(list, 1):
logger.info(f"处理第 {i}/{len(list)} 个数据: {text}") logger.info(f"处理第 {i}/{len(list)} 个数据: {text}")
if type == 'workflow':
# 方法1: 使用带inputs参数的调用 # 方法1: 使用带inputs参数的调用
inputs_data = {
"urls": f"{text}"
}
logger.info(f"调用call_api_with_inputs")
result = api_client.call_api_with_inputs(inputs_data)
if result['success']:
logger.info(f"URL {text} 处理成功")
else:
logger.error(f"URL {text} 处理失败: {result.get('error')}")
else:
#agent
inputs_data = {} inputs_data = {}
query = text query = text
logger.info(f"call_api_with_query")
result = api_client.call_api_with_query(inputs_data, query) result = api_client.call_api_with_inputs(inputs_data,query)
if result['success']: if result['success']:
logger.info(f"URL {text} 处理成功") logger.info(f"URL {text} 处理成功")
@ -320,7 +215,6 @@ def main():
# 可选:添加延迟避免请求过于频繁 # 可选:添加延迟避免请求过于频繁
if i < len(text): if i < len(text):
time.sleep(1) time.sleep(1)
except Exception as e: except Exception as e:
logger.error(f"程序执行失败: {e}") logger.error(f"程序执行失败: {e}")
@ -329,10 +223,6 @@ def main():
else: else:
logger.info("调用不带inputs参数的API示例") logger.info("调用不带inputs参数的API示例")
if type == 'workflow':
inputs_data = {}
result2 = api_client.call_api_without_inputs() result2 = api_client.call_api_without_inputs()
if result2['success']: if result2['success']:
@ -340,17 +230,6 @@ def main():
else: else:
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
else:
#agent
inputs_data = {}
result2 = api_client.call_api_without_query()
if result2['success']:
logger.info("不带inputs参数的API调用成功")
else:
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
except Exception as e: except Exception as e:
logger.error(f"初始化失败: {e}") logger.error(f"初始化失败: {e}")