add workflow Append-客户信息补全,dev
This commit is contained in:
parent
856774c789
commit
5c7be890d8
|
|
@ -0,0 +1,427 @@
|
|||
import requests
|
||||
import json
|
||||
import psycopg2
|
||||
from psycopg2.extras import RealDictCursor
|
||||
import time
|
||||
from typing import List, Dict, Any
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import threading
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class APIClient:
|
||||
"""简化的API客户端类"""
|
||||
|
||||
def __init__(self, api_url: str, auth_token: str):
|
||||
self.api_url = api_url
|
||||
self.headers = {
|
||||
'Authorization': auth_token,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""直接调用带inputs参数的API
|
||||
|
||||
Args:
|
||||
inputs: inputs参数字典
|
||||
|
||||
Returns:
|
||||
API响应数据
|
||||
"""
|
||||
payload = {
|
||||
"inputs": inputs,
|
||||
"response_mode": "blocking",
|
||||
"user": "admin"
|
||||
}
|
||||
logger.info(f"调用API,payload: {json.dumps(payload, ensure_ascii=False)}")
|
||||
|
||||
try:
|
||||
logger.info("调用带inputs参数的API")
|
||||
response = requests.post(
|
||||
self.api_url,
|
||||
headers=self.headers,
|
||||
json=payload, # Using json parameter instead of data to let requests handle serialization
|
||||
timeout=300 # Reduced timeout to more reasonable value
|
||||
)
|
||||
|
||||
logger.info(f"API调用完成,状态码: {response.status_code}")
|
||||
logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response
|
||||
|
||||
response.raise_for_status()
|
||||
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"status_code": response.status_code,
|
||||
"data": response.json() if response.content else {}
|
||||
}
|
||||
|
||||
except requests.exceptions.HTTPError as e:
|
||||
logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"HTTP {e.response.status_code}: {e.response.text}",
|
||||
"status_code": e.response.status_code
|
||||
}
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"API调用失败: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||
"""直接调用不带inputs参数的API
|
||||
|
||||
Args:
|
||||
other_params: 其他参数字典
|
||||
|
||||
Returns:
|
||||
API响应数据
|
||||
"""
|
||||
payload = {
|
||||
"inputs": {},
|
||||
"response_mode": "blocking",
|
||||
"user": "admin"
|
||||
}
|
||||
|
||||
# 添加其他参数
|
||||
if other_params:
|
||||
payload.update(other_params)
|
||||
|
||||
try:
|
||||
logger.info("调用不带inputs参数的API")
|
||||
response = requests.post(
|
||||
self.api_url,
|
||||
headers=self.headers,
|
||||
data=json.dumps(payload),
|
||||
timeout=300
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"status_code": response.status_code,
|
||||
"data": response.json()
|
||||
}
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"API调用失败: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
def call_api_with_query(self, query: str = None) -> Dict[str, Any]:
|
||||
"""直接调用带inputs参数的API
|
||||
|
||||
Args:
|
||||
query: 查询语句
|
||||
|
||||
Returns:
|
||||
API响应数据
|
||||
"""
|
||||
payload = json.dumps({
|
||||
"inputs": {},
|
||||
"query" : query,
|
||||
"response_mode": "streaming",
|
||||
"user": "admin"
|
||||
})
|
||||
|
||||
try:
|
||||
logger.info("调用带inputs参数的API")
|
||||
response = requests.post(
|
||||
self.api_url,
|
||||
headers=self.headers,
|
||||
data=payload,
|
||||
timeout=300
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"status_code": response.status_code,
|
||||
"data": response.json()
|
||||
}
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"API调用失败: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||
"""直接调用不带inputs参数的API
|
||||
|
||||
Args:
|
||||
other_params: 其他参数字典
|
||||
|
||||
Returns:
|
||||
API响应数据
|
||||
"""
|
||||
payload = {
|
||||
"inputs": {},
|
||||
"query":"",
|
||||
"response_mode": "streaming",
|
||||
"user": "admin"
|
||||
}
|
||||
|
||||
# 添加其他参数
|
||||
if other_params:
|
||||
payload.update(other_params)
|
||||
|
||||
try:
|
||||
logger.info("调用不带inputs参数的API")
|
||||
response = requests.post(
|
||||
self.api_url,
|
||||
headers=self.headers,
|
||||
data=json.dumps(payload),
|
||||
timeout=2400
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
logger.info(f"API调用成功,状态码: {response.status_code}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"status_code": response.status_code,
|
||||
"data": response.json()
|
||||
}
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"API调用失败: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
class DatabaseManager:
|
||||
"""数据库管理类"""
|
||||
|
||||
def __init__(self, db_config: Dict[str, str]):
|
||||
self.db_config = db_config
|
||||
self.connection = None
|
||||
|
||||
def connect(self):
|
||||
"""连接数据库"""
|
||||
try:
|
||||
self.connection = psycopg2.connect(
|
||||
host=self.db_config.get('host', 'localhost'),
|
||||
port=self.db_config.get('port', '5432'),
|
||||
database=self.db_config.get('database', 'postgres'),
|
||||
user=self.db_config.get('user', 'postgres'),
|
||||
password=self.db_config.get('password', ''),
|
||||
cursor_factory=RealDictCursor
|
||||
)
|
||||
logger.info("数据库连接成功")
|
||||
except Exception as e:
|
||||
logger.error(f"数据库连接失败: {e}")
|
||||
raise
|
||||
|
||||
def disconnect(self):
|
||||
"""断开数据库连接"""
|
||||
if self.connection:
|
||||
self.connection.close()
|
||||
logger.info("数据库连接已关闭")
|
||||
|
||||
def get_urls_from_database(self, query: str = None) -> List[Dict[str, Any]]:
|
||||
"""从数据库获取数据列表"""
|
||||
if not self.connection:
|
||||
self.connect()
|
||||
|
||||
# 默认查询语句,根据实际情况修改
|
||||
if query is None:
|
||||
query = "SELECT 1"
|
||||
|
||||
try:
|
||||
with self.connection.cursor() as cursor:
|
||||
cursor.execute(query)
|
||||
results = cursor.fetchall()
|
||||
|
||||
# 直接返回字典列表,每个字典包含 id 和 address
|
||||
return [dict(row) for row in results]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"查询数据库失败: {e}")
|
||||
return []
|
||||
|
||||
def process_single_item(api_client, item, type, item_index, total_count):
|
||||
"""处理单个项目"""
|
||||
logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item}")
|
||||
|
||||
try:
|
||||
if type == 'workflow':
|
||||
# item 是字典,包含 id 和 address
|
||||
inputs_data = {
|
||||
'company1': f"{item.get('company1', '')}",
|
||||
'address1': f"{item.get('address1', '')}",
|
||||
'city1':f"{item.get('city1', '')}",
|
||||
'company2': f"{item.get('company2', '')}",
|
||||
'address2': f"{item.get('address2', '')}",
|
||||
'city2':f"{item.get('city2', '')}"
|
||||
}
|
||||
|
||||
result = api_client.call_api_with_inputs(inputs_data)
|
||||
|
||||
if result['success']:
|
||||
logger.info(f"数据 {item} 处理成功")
|
||||
else:
|
||||
logger.error(f"数据 {item} 处理失败: {result.get('error')}")
|
||||
return result
|
||||
|
||||
else:
|
||||
#agent
|
||||
# 将字典转换为字符串查询
|
||||
query = f"ID: {item.get('id', '')}, 地址: {item.get('address', '')}"
|
||||
result = api_client.call_api_with_query(query)
|
||||
|
||||
if result['success']:
|
||||
logger.info(f"数据 {item} 处理成功")
|
||||
else:
|
||||
logger.error(f"数据 {item} 处理失败: {result.get('error')}")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理数据 {item} 时发生异常: {e}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
|
||||
#DATABASE_SCHEMA=p70_ai_intelligence
|
||||
def main():
|
||||
"""主函数"""
|
||||
# 配置并发数
|
||||
MAX_WORKERS = 10 # 可调整为5或10
|
||||
|
||||
# 数据库配置
|
||||
db_config = {
|
||||
'host': '124.221.232.219',
|
||||
'port': '5432',
|
||||
'database': 'daas_mpp',
|
||||
'user': 'dbuser_dba',
|
||||
'password': 'EmBRxnmmjnE3',
|
||||
'schema': 'p70_ai_intelligence'
|
||||
}
|
||||
|
||||
# API配置
|
||||
api_config = {
|
||||
'url': 'https://agent.idgvalue.com/v1/workflows/run',
|
||||
'auth_token': 'Bearer app-Kh8oXk9zFYxCUBzjVxKcQ7kp'
|
||||
}
|
||||
|
||||
api_client = APIClient(api_config['url'], api_config['auth_token'])
|
||||
|
||||
type = 'agent'
|
||||
type = 'workflow'
|
||||
|
||||
try:
|
||||
flag = True
|
||||
|
||||
if flag:
|
||||
# 初始化
|
||||
db_manager = DatabaseManager(db_config)
|
||||
custom_query = """
|
||||
SELECT
|
||||
t2.name as company1,
|
||||
t2.city_name as city1,
|
||||
t2.address_detail as address1,
|
||||
|
||||
t1.account as company2,
|
||||
t1.city as city2,
|
||||
t1.address as address2
|
||||
|
||||
FROM p70_ai_intelligence.agent_account_info t1
|
||||
left join p30_common.v_sql_cleaned_cn_d_account_info t2
|
||||
on t2."name" = t1."search"
|
||||
where t1.city <> t2.city_name
|
||||
and t2.name not in (select search from p70_ai_intelligence.agent_account_info_append)
|
||||
|
||||
limit 100
|
||||
"""
|
||||
|
||||
try:
|
||||
# 从数据库获取URL列表
|
||||
list = db_manager.get_urls_from_database(custom_query)
|
||||
|
||||
if not list:
|
||||
logger.warning("未获取到URL")
|
||||
return
|
||||
|
||||
# 展平所有数据项为单个列表
|
||||
# all_items = []
|
||||
# for batch in list:
|
||||
# items = batch.split('\n')
|
||||
# all_items.extend(items)
|
||||
|
||||
logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程")
|
||||
|
||||
# 使用线程池并发处理
|
||||
success_count = 0
|
||||
failed_count = 0
|
||||
|
||||
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
||||
# 提交所有任务
|
||||
future_to_item = {
|
||||
executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item
|
||||
for i, item in enumerate(list)
|
||||
}
|
||||
|
||||
# 收集结果
|
||||
for future in as_completed(future_to_item):
|
||||
item = future_to_item[future]
|
||||
try:
|
||||
result = future.result()
|
||||
if result.get("success"):
|
||||
success_count += 1
|
||||
else:
|
||||
failed_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"处理项目 {item} 时发生异常: {e}")
|
||||
failed_count += 1
|
||||
|
||||
logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"程序执行失败: {e}")
|
||||
|
||||
finally:
|
||||
db_manager.disconnect()
|
||||
|
||||
else:
|
||||
logger.info("调用不带inputs参数的API示例")
|
||||
if type == 'workflow':
|
||||
result2 = api_client.call_api_without_inputs()
|
||||
|
||||
if result2['success']:
|
||||
logger.info("不带inputs参数的API调用成功")
|
||||
else:
|
||||
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
|
||||
|
||||
else:
|
||||
#agent
|
||||
result2 = api_client.call_api_without_query()
|
||||
|
||||
if result2['success']:
|
||||
logger.info("不带inputs参数的API调用成功")
|
||||
else:
|
||||
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"初始化失败: {e}")
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
#!/usr/bin/python
|
||||
# -*- encoding=utf-8 -*-
|
||||
from airflow import DAG
|
||||
from datetime import datetime, timedelta
|
||||
from airflow.contrib.hooks.ssh_hook import SSHHook
|
||||
from airflow.contrib.operators.ssh_operator import SSHOperator
|
||||
from airflow.sensors.external_task_sensor import ExternalTaskSensor
|
||||
import json
|
||||
|
||||
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
||||
from airflow.operators.email_operator import EmailOperator
|
||||
from airflow.utils.trigger_rule import TriggerRule
|
||||
|
||||
|
||||
sshHook = SSHHook(ssh_conn_id ='ssh_air')
|
||||
default_args = {
|
||||
'owner': 'tek_newsletter@163.com',
|
||||
'email_on_failure': True,
|
||||
'email_on_retry':True,
|
||||
'start_date': datetime(2024, 1, 1),
|
||||
'depends_on_past': False,
|
||||
'retries': 6,
|
||||
'retry_delay': timedelta(minutes=10),
|
||||
}
|
||||
|
||||
dag = DAG('wf_dag_account_complete_append', default_args=default_args,
|
||||
schedule=None,
|
||||
catchup=False,
|
||||
dagrun_timeout=timedelta(minutes=600),
|
||||
max_active_runs=3,
|
||||
tags=['64cbd1bbced14209b5e3a879f89e8ab1','TK_Cust','Append-客户信息补全']
|
||||
)
|
||||
|
||||
task_failed = EmailOperator (
|
||||
dag=dag,
|
||||
trigger_rule=TriggerRule.ONE_FAILED,
|
||||
task_id="task_failed",
|
||||
to=["tek_newsletter@163.com"],
|
||||
cc=[""],
|
||||
subject="account_complete_append_failed",
|
||||
html_content='<h3>您好,account_complete_append作业失败,请及时处理" </h3>')
|
||||
|
||||
|
||||
156_account_complete_append = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='156_account_complete_append',
|
||||
command='cd /data/airflow/etl/agents && python 156_account_complete_append.py',
|
||||
params={'my_param':"156_account_complete_append"},
|
||||
depends_on_past=False,
|
||||
retries=1,
|
||||
dag=dag)
|
||||
|
||||
156_account_complete_append >> task_failed
|
||||
Loading…
Reference in New Issue