From 5c7be890d82e490c43b23ce2dc0b8e005849a071 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 8 Dec 2025 16:04:23 +0800 Subject: [PATCH] =?UTF-8?q?add=20workflow=20Append-=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E8=A1=A5=E5=85=A8,dev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../156_account_complete_append.py | 427 ++++++++++++++++++ .../wf_dag_account_complete_append.py | 53 +++ 2 files changed, 480 insertions(+) create mode 100644 dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py create mode 100644 dev/workflow/TK_Cust/account_complete_append/Append-客户信息补全/wf_dag_account_complete_append.py diff --git a/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py b/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py new file mode 100644 index 0000000..6182c05 --- /dev/null +++ b/dev/workflow/TK_Cust/account_complete_append/156-客户信息补全补充/156_account_complete_append.py @@ -0,0 +1,427 @@ +import requests +import json +import psycopg2 +from psycopg2.extras import RealDictCursor +import time +from typing import List, Dict, Any +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +import threading + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +class APIClient: + """简化的API客户端类""" + + def __init__(self, api_url: str, auth_token: str): + self.api_url = api_url + self.headers = { + 'Authorization': auth_token, + 'Content-Type': 'application/json' + } + + def call_api_with_inputs(self, inputs: Dict[str, Any]) -> Dict[str, Any]: + """直接调用带inputs参数的API + + Args: + inputs: inputs参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": inputs, + "response_mode": "blocking", + "user": "admin" + } + logger.info(f"调用API,payload: {json.dumps(payload, ensure_ascii=False)}") + + try: + logger.info("调用带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + json=payload, # Using json parameter instead of data to let requests handle serialization + timeout=300 # Reduced timeout to more reasonable value + ) + + logger.info(f"API调用完成,状态码: {response.status_code}") + logger.info(f"Response content: {response.text[:500]}") # Log first 500 chars of response + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() if response.content else {} + } + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") + return { + "success": False, + "error": f"HTTP {e.response.status_code}: {e.response.text}", + "status_code": e.response.status_code + } + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + + def call_api_without_inputs(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: + """直接调用不带inputs参数的API + + Args: + other_params: 其他参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "response_mode": "blocking", + "user": "admin" + } + + # 添加其他参数 + if other_params: + payload.update(other_params) + + try: + logger.info("调用不带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=json.dumps(payload), + timeout=300 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + + def call_api_with_query(self, query: str = None) -> Dict[str, Any]: + """直接调用带inputs参数的API + + Args: + query: 查询语句 + + Returns: + API响应数据 + """ + payload = json.dumps({ + "inputs": {}, + "query" : query, + "response_mode": "streaming", + "user": "admin" + }) + + try: + logger.info("调用带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=payload, + timeout=300 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + + def call_api_without_query(self, other_params: Dict[str, Any] = None) -> Dict[str, Any]: + """直接调用不带inputs参数的API + + Args: + other_params: 其他参数字典 + + Returns: + API响应数据 + """ + payload = { + "inputs": {}, + "query":"", + "response_mode": "streaming", + "user": "admin" + } + + # 添加其他参数 + if other_params: + payload.update(other_params) + + try: + logger.info("调用不带inputs参数的API") + response = requests.post( + self.api_url, + headers=self.headers, + data=json.dumps(payload), + timeout=2400 + ) + + response.raise_for_status() + logger.info(f"API调用成功,状态码: {response.status_code}") + + return { + "success": True, + "status_code": response.status_code, + "data": response.json() + } + + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {e}") + return { + "success": False, + "error": str(e) + } + +class DatabaseManager: + """数据库管理类""" + + def __init__(self, db_config: Dict[str, str]): + self.db_config = db_config + self.connection = None + + def connect(self): + """连接数据库""" + try: + self.connection = psycopg2.connect( + host=self.db_config.get('host', 'localhost'), + port=self.db_config.get('port', '5432'), + database=self.db_config.get('database', 'postgres'), + user=self.db_config.get('user', 'postgres'), + password=self.db_config.get('password', ''), + cursor_factory=RealDictCursor + ) + logger.info("数据库连接成功") + except Exception as e: + logger.error(f"数据库连接失败: {e}") + raise + + def disconnect(self): + """断开数据库连接""" + if self.connection: + self.connection.close() + logger.info("数据库连接已关闭") + + def get_urls_from_database(self, query: str = None) -> List[Dict[str, Any]]: + """从数据库获取数据列表""" + if not self.connection: + self.connect() + + # 默认查询语句,根据实际情况修改 + if query is None: + query = "SELECT 1" + + try: + with self.connection.cursor() as cursor: + cursor.execute(query) + results = cursor.fetchall() + + # 直接返回字典列表,每个字典包含 id 和 address + return [dict(row) for row in results] + + except Exception as e: + logger.error(f"查询数据库失败: {e}") + return [] + +def process_single_item(api_client, item, type, item_index, total_count): + """处理单个项目""" + logger.info(f"开始处理第 {item_index}/{total_count} 个数据: {item}") + + try: + if type == 'workflow': + # item 是字典,包含 id 和 address + inputs_data = { + 'company1': f"{item.get('company1', '')}", + 'address1': f"{item.get('address1', '')}", + 'city1':f"{item.get('city1', '')}", + 'company2': f"{item.get('company2', '')}", + 'address2': f"{item.get('address2', '')}", + 'city2':f"{item.get('city2', '')}" + } + + result = api_client.call_api_with_inputs(inputs_data) + + if result['success']: + logger.info(f"数据 {item} 处理成功") + else: + logger.error(f"数据 {item} 处理失败: {result.get('error')}") + return result + + else: + #agent + # 将字典转换为字符串查询 + query = f"ID: {item.get('id', '')}, 地址: {item.get('address', '')}" + result = api_client.call_api_with_query(query) + + if result['success']: + logger.info(f"数据 {item} 处理成功") + else: + logger.error(f"数据 {item} 处理失败: {result.get('error')}") + return result + + except Exception as e: + logger.error(f"处理数据 {item} 时发生异常: {e}") + return {"success": False, "error": str(e)} + +#DATABASE_URL=postgresql://dbuser_dba:EmBRxnmmjnE3@124.221.232.219:5432/daas_mpp +#DATABASE_SCHEMA=p70_ai_intelligence +def main(): + """主函数""" + # 配置并发数 + MAX_WORKERS = 10 # 可调整为5或10 + + # 数据库配置 + db_config = { + 'host': '124.221.232.219', + 'port': '5432', + 'database': 'daas_mpp', + 'user': 'dbuser_dba', + 'password': 'EmBRxnmmjnE3', + 'schema': 'p70_ai_intelligence' + } + + # API配置 + api_config = { + 'url': 'https://agent.idgvalue.com/v1/workflows/run', + 'auth_token': 'Bearer app-Kh8oXk9zFYxCUBzjVxKcQ7kp' + } + + api_client = APIClient(api_config['url'], api_config['auth_token']) + + type = 'agent' + type = 'workflow' + + try: + flag = True + + if flag: + # 初始化 + db_manager = DatabaseManager(db_config) + custom_query = """ +SELECT +t2.name as company1, +t2.city_name as city1, +t2.address_detail as address1, + +t1.account as company2, +t1.city as city2, +t1.address as address2 + + FROM p70_ai_intelligence.agent_account_info t1 + left join p30_common.v_sql_cleaned_cn_d_account_info t2 + on t2."name" = t1."search" + where t1.city <> t2.city_name + and t2.name not in (select search from p70_ai_intelligence.agent_account_info_append) + + limit 100 +""" + + try: + # 从数据库获取URL列表 + list = db_manager.get_urls_from_database(custom_query) + + if not list: + logger.warning("未获取到URL") + return + + # 展平所有数据项为单个列表 + # all_items = [] + # for batch in list: + # items = batch.split('\n') + # all_items.extend(items) + + logger.info(f"总共需要处理 {len(list)} 个项目,使用 {MAX_WORKERS} 个并发线程") + + # 使用线程池并发处理 + success_count = 0 + failed_count = 0 + + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # 提交所有任务 + future_to_item = { + executor.submit(process_single_item, api_client, item, type, i+1, len(list)): item + for i, item in enumerate(list) + } + + # 收集结果 + for future in as_completed(future_to_item): + item = future_to_item[future] + try: + result = future.result() + if result.get("success"): + success_count += 1 + else: + failed_count += 1 + except Exception as e: + logger.error(f"处理项目 {item} 时发生异常: {e}") + failed_count += 1 + + logger.info(f"处理完成。成功: {success_count}, 失败: {failed_count}") + + except Exception as e: + logger.error(f"程序执行失败: {e}") + + finally: + db_manager.disconnect() + + else: + logger.info("调用不带inputs参数的API示例") + if type == 'workflow': + result2 = api_client.call_api_without_inputs() + + if result2['success']: + logger.info("不带inputs参数的API调用成功") + else: + logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") + + else: + #agent + result2 = api_client.call_api_without_query() + + if result2['success']: + logger.info("不带inputs参数的API调用成功") + else: + logger.error(f"不带inputs参数的API调用失败: {result2.get('error')}") + + except Exception as e: + logger.error(f"初始化失败: {e}") + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/dev/workflow/TK_Cust/account_complete_append/Append-客户信息补全/wf_dag_account_complete_append.py b/dev/workflow/TK_Cust/account_complete_append/Append-客户信息补全/wf_dag_account_complete_append.py new file mode 100644 index 0000000..10135a0 --- /dev/null +++ b/dev/workflow/TK_Cust/account_complete_append/Append-客户信息补全/wf_dag_account_complete_append.py @@ -0,0 +1,53 @@ +#!/usr/bin/python +# -*- encoding=utf-8 -*- +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.contrib.operators.ssh_operator import SSHOperator +from airflow.sensors.external_task_sensor import ExternalTaskSensor +import json + +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.operators.email_operator import EmailOperator +from airflow.utils.trigger_rule import TriggerRule + + +sshHook = SSHHook(ssh_conn_id ='ssh_air') +default_args = { +'owner': 'tek_newsletter@163.com', +'email_on_failure': True, +'email_on_retry':True, +'start_date': datetime(2024, 1, 1), +'depends_on_past': False, +'retries': 6, +'retry_delay': timedelta(minutes=10), +} + +dag = DAG('wf_dag_account_complete_append', default_args=default_args, +schedule=None, +catchup=False, +dagrun_timeout=timedelta(minutes=600), +max_active_runs=3, +tags=['64cbd1bbced14209b5e3a879f89e8ab1','TK_Cust','Append-客户信息补全'] +) + +task_failed = EmailOperator ( + dag=dag, + trigger_rule=TriggerRule.ONE_FAILED, + task_id="task_failed", + to=["tek_newsletter@163.com"], + cc=[""], + subject="account_complete_append_failed", + html_content='

您好,account_complete_append作业失败,请及时处理"

') + + +156_account_complete_append = SSHOperator( +ssh_hook=sshHook, +task_id='156_account_complete_append', +command='cd /data/airflow/etl/agents && python 156_account_complete_append.py', +params={'my_param':"156_account_complete_append"}, +depends_on_past=False, +retries=1, +dag=dag) + +156_account_complete_append >> task_failed