diff --git a/dev/workflow/TK_Cust/address_complete/客户地址信息提取/wf_dag_address_complete.py b/dev/workflow/TK_Cust/address_complete/客户地址信息提取/wf_dag_address_complete.py
new file mode 100644
index 0000000..da63b3e
--- /dev/null
+++ b/dev/workflow/TK_Cust/address_complete/客户地址信息提取/wf_dag_address_complete.py
@@ -0,0 +1,53 @@
+#!/usr/bin/python
+# -*- encoding=utf-8 -*-
+from airflow import DAG
+from datetime import datetime, timedelta
+from airflow.contrib.hooks.ssh_hook import SSHHook
+from airflow.contrib.operators.ssh_operator import SSHOperator
+from airflow.sensors.external_task_sensor import ExternalTaskSensor
+import json
+
+from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+from airflow.operators.email_operator import EmailOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+
+sshHook = SSHHook(ssh_conn_id ='ssh_air')
+default_args = {
+'owner': 'tek_newsletter@163.com',
+'email_on_failure': True,
+'email_on_retry':True,
+'start_date': datetime(2024, 1, 1),
+'depends_on_past': False,
+'retries': 6,
+'retry_delay': timedelta(minutes=10),
+}
+
+dag = DAG('wf_dag_address_complete', default_args=default_args,
+schedule=None,
+catchup=False,
+dagrun_timeout=timedelta(minutes=1200),
+max_active_runs=3,
+tags=['64cbd1bbced14209b5e3a879f89e8ab1','TK_Cust','客户地址信息提取']
+)
+
+task_failed = EmailOperator (
+ dag=dag,
+ trigger_rule=TriggerRule.ONE_FAILED,
+ task_id="task_failed",
+ to=["tek_newsletter@163.com"],
+ cc=[""],
+ subject="address_complete_failed",
+ html_content='
您好,address_complete作业失败,请及时处理"
')
+
+
+address_complete = SSHOperator(
+ssh_hook=sshHook,
+task_id='address_complete',
+command='cd /data/airflow/etl/agents && python address_complete.py',
+params={'my_param':"address_complete"},
+depends_on_past=False,
+retries=1,
+dag=dag)
+
+address_complete >> task_failed
diff --git a/dev/workflow/TK_Cust/address_complete/客户地址城市解析/address_complete.py b/dev/workflow/TK_Cust/address_complete/客户地址城市解析/address_complete.py
new file mode 100644
index 0000000..3e90dc3
--- /dev/null
+++ b/dev/workflow/TK_Cust/address_complete/客户地址城市解析/address_complete.py
@@ -0,0 +1,400 @@
+import requests
+import json
+import psycopg2
+from psycopg2.extras import RealDictCursor
+import time
+from typing import List, Dict, Any
+import logging
+from concurrent.futures import ThreadPoolExecutor, as_completed
+import threading
+
+# 配置日志
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+class APIClient:
+ """简化的API客户端类"""
+
+ def __init__(self, api_url: str, auth_token: str):
+ self.api_url = api_url
+ self.headers = {
+ 'Authorization': auth_token,
+ 'Content-Type': 'application/json'
+ }
+
+ def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
+ """直接调用带inputs参数的API
+
+ Args:
+ inputs: inputs参数字典
+
+ Returns:
+ API响应数据
+ """
+ payload = json.dumps({
+ "inputs": inputs,
+ "response_mode": "blocking",
+ "user": "admin"
+ })
+ logger.info(f"调用API,payload: {payload}")
+
+ try:
+ logger.info("调用带inputs参数的API")
+ response = requests.post(
+ self.api_url,
+ headers=self.headers,
+ data=payload,
+ timeout=2400
+ )
+
+ response.raise_for_status()
+ logger.info(f"API调用成功,状态码: {response.status_code}")
+
+ return {
+ "success": True,
+ "status_code": response.status_code,
+ "data": response.json()
+ }
+
+ except requests.exceptions.RequestException as e:
+ logger.error(f"API调用失败: {e}")
+ return {
+ "success": False,
+ "error": str(e)
+ }
+
+
+ def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
+ """直接调用不带inputs参数的API
+
+ Args:
+ other_params: 其他参数字典
+
+ Returns:
+ API响应数据
+ """
+ payload = {
+ "inputs": {},
+ "response_mode": "blocking",
+ "user": "admin"
+ }
+
+ # 添加其他参数
+ if other_params:
+ payload.update(other_params)
+
+ try:
+ logger.info("调用不带inputs参数的API")
+ response = requests.post(
+ self.api_url,
+ headers=self.headers,
+ data=json.dumps(payload),
+ timeout=1200
+ )
+
+ response.raise_for_status()
+ logger.info(f"API调用成功,状态码: {response.status_code}")
+
+ return {
+ "success": True,
+ "status_code": response.status_code,
+ "data": response.json()
+ }
+
+ except requests.exceptions.RequestException as e:
+ logger.error(f"API调用失败: {e}")
+ return {
+ "success": False,
+ "error": str(e)
+ }
+
+
+ def call_api_with_query(self, query: str = None) -> Dict[str, Any]:
+ """直接调用带inputs参数的API
+
+ Args:
+ query: 查询语句
+
+ Returns:
+ API响应数据
+ """
+ payload = json.dumps({
+ "inputs": {},
+ "query" : query,
+ "response_mode": "streaming",
+ "user": "admin"
+ })
+
+ try:
+ logger.info("调用带inputs参数的API")
+ response = requests.post(
+ self.api_url,
+ headers=self.headers,
+ data=payload,
+ timeout=2400
+ )
+
+ response.raise_for_status()
+ logger.info(f"API调用成功,状态码: {response.status_code}")
+
+ return {
+ "success": True,
+ "status_code": response.status_code,
+ "data": response.json()
+ }
+
+ except requests.exceptions.RequestException as e:
+ logger.error(f"API调用失败: {e}")
+ return {
+ "success": False,
+ "error": str(e)
+ }
+
+ def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]:
+ """直接调用不带inputs参数的API
+
+ Args:
+ other_params: 其他参数字典
+
+ Returns:
+ API响应数据
+ """
+ payload = {
+ "inputs": {},
+ "query":"",
+ "response_mode": "streaming",
+ "user": "admin"
+ }
+
+ # 添加其他参数
+ if other_params:
+ payload.update(other_params)
+
+ try:
+ logger.info("调用不带inputs参数的API")
+ response = requests.post(
+ self.api_url,
+ headers=self.headers,
+ data=json.dumps(payload),
+ timeout=2400
+ )
+
+ response.raise_for_status()
+ logger.info(f"API调用成功,状态码: {response.status_code}")
+
+ return {
+ "success": True,
+ "status_code": response.status_code,
+ "data": response.json()
+ }
+
+ except requests.exceptions.RequestException as e:
+ logger.error(f"API调用失败: {e}")
+ return {
+ "success": False,
+ "error": str(e)
+ }
+
+class DatabaseManager:
+ """数据库管理类"""
+
+ def __init__(self, db_config: Dict[str, str]):
+ self.db_config = db_config
+ self.connection = None
+
+ def connect(self):
+ """连接数据库"""
+ try:
+ self.connection = psycopg2.connect(
+ host=self.db_config.get('host', 'localhost'),
+ port=self.db_config.get('port', '5432'),
+ database=self.db_config.get('database', 'postgres'),
+ user=self.db_config.get('user', 'postgres'),
+ password=self.db_config.get('password', ''),
+ cursor_factory=RealDictCursor
+ )
+ logger.info("数据库连接成功")
+ except Exception as e:
+ logger.error(f"数据库连接失败: {e}")
+ raise
+
+ def disconnect(self):
+ """断开数据库连接"""
+ if self.connection:
+ self.connection.close()
+ logger.info("数据库连接已关闭")
+
+ def get_urls_from_database(self, query: str = None) -> List[Dict[str, Any]]:
+ """从数据库获取数据列表"""
+ if not self.connection:
+ self.connect()
+
+ # 默认查询语句,根据实际情况修改
+ if query is None:
+ query = "SELECT 1"
+
+ try:
+ with self.connection.cursor() as cursor:
+ cursor.execute(query)
+ results = cursor.fetchall()
+
+ # 直接返回字典列表,每个字典包含 id 和 address
+ return [dict(row) for row in results]
+
+ except Exception as e:
+ logger.error(f"查询数据库失败: {e}")
+ return []
+
+def process_single_item(api_client, item, type, item_index, total_count):
+ """处理单个项目"""
+ logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item}")
+
+ try:
+ if type == 'workflow':
+ # item 是字典,包含 id 和 address
+ inputs_data = {
+ 'id': f"{item.get('id', '')}",
+ 'address': f"{item.get('address', '')}"
+ }
+
+ result = api_client.call_api_with_inputs(inputs_data)
+
+ if result['success']:
+ logger.info(f"数据 {item} 处理成功")
+ else:
+ logger.error(f"数据 {item} 处理失败: {result.get('error')}")
+ return result
+
+ else:
+ #agent
+ # 将字典转换为字符串查询
+ query = f"ID: {item.get('id', '')}, 地址: {item.get('address', '')}"
+ result = api_client.call_api_with_query(query)
+
+ if result['success']:
+ logger.info(f"数据 {item} 处理成功")
+ else:
+ logger.error(f"数据 {item} 处理失败: {result.get('error')}")
+ return result
+
+ except Exception as e:
+ logger.error(f"处理数据 {item} 时发生异常: {e}")
+ return {"success": False, "error": str(e)}
+
+#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp
+#DATABASE_SCHEMA=p70_ai_intelligence
+def main():
+ """主函数"""
+ # 配置并发数
+ MAX_WORKERS = 10 # 可调整为5或10
+
+ # 数据库配置
+ db_config = {
+ 'host': '124.221.232.219',
+ 'port': '5432',
+ 'database': 'daas_mpp',
+ 'user': 'dbuser_dba',
+ 'password': 'EmBRxnmmjnE3',
+ 'schema': 'p70_ai_intelligence'
+ }
+
+ # API配置
+ api_config = {
+ 'url': 'https://tk-agent.idgvalue.com/v1/workflows/run',
+ 'auth_token': 'Bearer app-T4vF1pdBNRPv7oLbaL0neQze'
+ }
+
+ api_client = APIClient(api_config['url'], api_config['auth_token'])
+
+ type = 'agent'
+ type = 'workflow'
+
+ try:
+ flag = True
+
+ if flag:
+ # 初始化
+ db_manager = DatabaseManager(db_config)
+ custom_query = """
+select dw_account,address_detail from p99_temp.address_city
+where dw_account not in (select account_id from p70_ai_intelligence.agent_address_complete)
+
+"""
+
+ try:
+ # 从数据库获取URL列表
+ list = db_manager.get_urls_from_database(custom_query)
+
+ if not list:
+ logger.warning("未获取到URL")
+ return
+
+ # 展平所有数据项为单个列表
+ # all_items = []
+ # for batch in list:
+ # items = batch.split('\n')
+ # all_items.extend(items)
+
+ logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程")
+
+ # 使用线程池并发处理
+ success_count = 0
+ failed_count = 0
+
+ with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+ # 提交所有任务
+ future_to_item = {
+ executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item
+ for i, item in enumerate(list)
+ }
+
+ # 收集结果
+ for future in as_completed(future_to_item):
+ item = future_to_item[future]
+ try:
+ result = future.result()
+ if result.get("success"):
+ success_count += 1
+ else:
+ failed_count += 1
+ except Exception as e:
+ logger.error(f"处理项目 {item} 时发生异常: {e}")
+ failed_count += 1
+
+ logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}")
+
+ except Exception as e:
+ logger.error(f"程序执行失败: {e}")
+
+ finally:
+ db_manager.disconnect()
+
+ else:
+ logger.info("调用不带inputs参数的API示例")
+ if type == 'workflow':
+ result2 = api_client.call_api_without_inputs()
+
+ if result2['success']:
+ logger.info("不带inputs参数的API调用成功")
+ else:
+ logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
+
+ else:
+ #agent
+ result2 = api_client.call_api_without_query()
+
+ if result2['success']:
+ logger.info("不带inputs参数的API调用成功")
+ else:
+ logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}")
+
+ except Exception as e:
+ logger.error(f"初始化失败: {e}")
+
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file