add workflow 客户地址信息提取,dev

This commit is contained in:
root 2025-12-05 17:13:39 +08:00
parent 6b0017f9db
commit 0e1154e4aa
2 changed files with 453 additions and 0 deletions

View File

@ -0,0 +1,53 @@
#!/usr/bin/python
# -*- encoding=utf-8 -*-
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import json
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
sshHook = SSHHook(ssh_conn_id ='ssh_air')
default_args = {
'owner': 'tek_newsletter@163.com',
'email_on_failure': True,
'email_on_retry':True,
'start_date': datetime(2024, 1, 1),
'depends_on_past': False,
'retries': 6,
'retry_delay': timedelta(minutes=10),
}
dag = DAG('wf_dag_address_complete', default_args=default_args,
schedule=None,
catchup=False,
dagrun_timeout=timedelta(minutes=1200),
max_active_runs=3,
tags=['64cbd1bbced14209b5e3a879f89e8ab1','TK_Cust','客户地址信息提取']
)
task_failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="task_failed",
to=["tek_newsletter@163.com"],
cc=[""],
subject="address_complete_failed",
html_content='<h3>您好address_complete作业失败请及时处理" </h3>')
address_complete = SSHOperator(
ssh_hook=sshHook,
task_id='address_complete',
command='cd /data/airflow/etl/agents && python address_complete.py',
params={'my_param':"address_complete"},
depends_on_past=False,
retries=1,
dag=dag)
address_complete >> task_failed

View File

@ -0,0 +1,400 @@
import requests
import json
import psycopg2
from psycopg2.extras import RealDictCursor
import time
from typing import List, Dict, Any
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class APIClient:
"""简化的API客户端类"""
def __init__(self, api_url: str, auth_token: str):
self.api_url = api_url
self.headers = {
'Authorization': auth_token,
'Content-Type': 'application/json'
}
def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""直接调用带inputs参数的API
Args:
inputs: inputs参数字典
Returns:
API响应数据
"""
payload = json.dumps({
"inputs": inputs,
"response_mode": "blocking",
"user": "admin"
})
logger.info(f"调用APIpayload: {payload}")
try:
logger.info("调用带inputs参数的API")
response = requests.post(
self.api_url,
headers=self.headers,
data=payload,
timeout=2400
)
response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}")
return {
"success": True,
"status_code": response.status_code,
"data": response.json()
}
except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}")
return {
"success": False,
"error": str(e)
}
def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
"""直接调用不带inputs参数的API
Args:
other_params: 其他参数字典
Returns:
API响应数据
"""
payload = {
"inputs": {},
"response_mode": "blocking",
"user": "admin"
}
# 添加其他参数
if other_params:
payload.update(other_params)
try:
logger.info("调用不带inputs参数的API")
response = requests.post(
self.api_url,
headers=self.headers,
data=json.dumps(payload),
timeout=1200
)
response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}")
return {
"success": True,
"status_code": response.status_code,
"data": response.json()
}
except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}")
return {
"success": False,
"error": str(e)
}
def call_api_with_query(self, query: str = None) -> Dict[str, Any]:
"""直接调用带inputs参数的API
Args:
query: 查询语句
Returns:
API响应数据
"""
payload = json.dumps({
"inputs": {},
"query" : query,
"response_mode": "streaming",
"user": "admin"
})
try:
logger.info("调用带inputs参数的API")
response = requests.post(
self.api_url,
headers=self.headers,
data=payload,
timeout=2400
)
response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}")
return {
"success": True,
"status_code": response.status_code,
"data": response.json()
}
except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}")
return {
"success": False,
"error": str(e)
}
def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
"""直接调用不带inputs参数的API
Args:
other_params: 其他参数字典
Returns:
API响应数据
"""
payload = {
"inputs": {},
"query":"",
"response_mode": "streaming",
"user": "admin"
}
# 添加其他参数
if other_params:
payload.update(other_params)
try:
logger.info("调用不带inputs参数的API")
response = requests.post(
self.api_url,
headers=self.headers,
data=json.dumps(payload),
timeout=2400
)
response.raise_for_status()
logger.info(f"API调用成功状态码: {response.status_code}")
return {
"success": True,
"status_code": response.status_code,
"data": response.json()
}
except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {e}")
return {
"success": False,
"error": str(e)
}
class DatabaseManager:
"""数据库管理类"""
def __init__(self, db_config: Dict[str, str]):
self.db_config = db_config
self.connection = None
def connect(self):
"""连接数据库"""
try:
self.connection = psycopg2.connect(
host=self.db_config.get('host', 'localhost'),
port=self.db_config.get('port', '5432'),
database=self.db_config.get('database', 'postgres'),
user=self.db_config.get('user', 'postgres'),
password=self.db_config.get('password', ''),
cursor_factory=RealDictCursor
)
logger.info("数据库连接成功")
except Exception as e:
logger.error(f"数据库连接失败: {e}")
raise
def disconnect(self):
"""断开数据库连接"""
if self.connection:
self.connection.close()
logger.info("数据库连接已关闭")
def get_urls_from_database(self, query: str = None) -> List[Dict[str, Any]]:
"""从数据库获取数据列表"""
if not self.connection:
self.connect()
# 默认查询语句,根据实际情况修改
if query is None:
query = "SELECT 1"
try:
with self.connection.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchall()
# 直接返回字典列表,每个字典包含 id 和 address
return [dict(row) for row in results]
except Exception as e:
logger.error(f"查询数据库失败: {e}")
return []
def process_single_item(api_client, item, type, item_index, total_count):
"""处理单个项目"""
logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item}")
try:
if type == 'workflow':
# item 是字典,包含 id 和 address
inputs_data = {
'id': f"{item.get('id', '')}",
'address': f"{item.get('address', '')}"
}
result = api_client.call_api_with_inputs(inputs_data)
if result['success']:
logger.info(f"数据 {item} 处理成功")
else:
logger.error(f"数据 {item} 处理失败: {result.get('error')}")
return result
else:
#agent
# 将字典转换为字符串查询
query = f"ID: {item.get('id', '')}, 地址: {item.get('address', '')}"
result = api_client.call_api_with_query(query)
if result['success']:
logger.info(f"数据 {item} 处理成功")
else:
logger.error(f"数据 {item} 处理失败: {result.get('error')}")
return result
except Exception as e:
logger.error(f"处理数据 {item} 时发生异常: {e}")
return {"success": False, "error": str(e)}
#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
#DATABASE_SCHEMA=p70_ai_intelligence
def main():
"""主函数"""
# 配置并发数
MAX_WORKERS = 10 # 可调整为5或10
# 数据库配置
db_config = {
'host': '124.221.232.219',
'port': '5432',
'database': 'daas_mpp',
'user': 'dbuser_dba',
'password': 'EmBRxnmmjnE3',
'schema': 'p70_ai_intelligence'
}
# API配置
api_config = {
'url': 'https://tk-agent.idgvalue.com/v1/workflows/run',
'auth_token': 'Bearer app-T4vF1pdBNRPv7oLbaL0neQze'
}
api_client = APIClient(api_config['url'], api_config['auth_token'])
type = 'agent'
type = 'workflow'
try:
flag = True
if flag:
# 初始化
db_manager = DatabaseManager(db_config)
custom_query = """
select dw_account,address_detail from p99_temp.address_city
where dw_account not in (select account_id from p70_ai_intelligence.agent_address_complete)
"""
try:
# 从数据库获取URL列表
list = db_manager.get_urls_from_database(custom_query)
if not list:
logger.warning("未获取到URL")
return
# 展平所有数据项为单个列表
# all_items = []
# for batch in list:
# items = batch.split('\n')
# all_items.extend(items)
logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程")
# 使用线程池并发处理
success_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交所有任务
future_to_item = {
executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item
for i, item in enumerate(list)
}
# 收集结果
for future in as_completed(future_to_item):
item = future_to_item[future]
try:
result = future.result()
if result.get("success"):
success_count += 1
else:
failed_count += 1
except Exception as e:
logger.error(f"处理项目 {item} 时发生异常: {e}")
failed_count += 1
logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}")
except Exception as e:
logger.error(f"程序执行失败: {e}")
finally:
db_manager.disconnect()
else:
logger.info("调用不带inputs参数的API示例")
if type == 'workflow':
result2 = api_client.call_api_without_inputs()
if result2['success']:
logger.info("不带inputs参数的API调用成功")
else:
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
else:
#agent
result2 = api_client.call_api_without_query()
if result2['success']:
logger.info("不带inputs参数的API调用成功")
else:
logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
except Exception as e:
logger.error(f"初始化失败: {e}")
if __name__ == "__main__":
main()