From 535bac12ff37f494341db9d44fd1c1bc10165b0a Mon Sep 17 00:00:00 2001 From: root Date: Wed, 31 Jul 2024 14:19:59 +0800 Subject: [PATCH] =?UTF-8?q?add=20workflow=20=E5=A4=A9=E6=B6=A6Smart-ccc?= =?UTF-8?q?=E5=B7=A5=E5=8D=95=E6=95=B0=E6=8D=AE,dev?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dynamic_data_export.py | 207 ++++++++++++++++++ .../wf_dag_smart_ccc_ticket.py | 33 +-- 2 files changed, 219 insertions(+), 21 deletions(-) create mode 100644 dev/workflow/TK_Cust/smart_ccc_ticket/dynamic_data_export/dynamic_data_export.py diff --git a/dev/workflow/TK_Cust/smart_ccc_ticket/dynamic_data_export/dynamic_data_export.py b/dev/workflow/TK_Cust/smart_ccc_ticket/dynamic_data_export/dynamic_data_export.py new file mode 100644 index 0000000..1ce0cf4 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_ticket/dynamic_data_export/dynamic_data_export.py @@ -0,0 +1,207 @@ +import os +import pandas as pd +from openpyxl import Workbook +import argparse +from sqlalchemy import create_engine +from sqlalchemy.exc import SQLAlchemyError +from psycopg2 import connect, extras +import datetime +import psycopg2 +from sqlalchemy import text +import random + +def export_view_to_csv(view_name, output_file, where_condition, db_config, output_path): + + + # 检查环境变量是否都已设置 + for key in db_config: + if db_config[key] is None: + raise ValueError(f"Environment variable {key} is not set.") + + # 构建数据库连接字符串 + db_uri = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}" + + # 创建SQLAlchemy引擎 + engine = create_engine(db_uri) + + if where_condition is None or where_condition.strip() == "": + where_condition = " WHERE 1=1 " + + # 构建 SQL 查询语句,选择视图的所有列 + query = f""" + SELECT * FROM {view_name} {where_condition}; + """ + + # 使用 pandas 读取 SQL 查询结果 + df = pd.read_sql_query(query, engine) + + # 指定保存 CSV 文件的完整路径 + file_path = os.path.join(output_path, output_file) + + # 检查output_path目录是否存在,如果不存在则创建 + if not os.path.exists(output_path): + os.makedirs(output_path) + + # 使用 pandas 的 to_csv 方法保存数据到 CSV 文件 + df.to_csv(file_path, index=False) +def main(mail_task_id, dops_db_config, mpp_db_config, send_from): + mail_task_result = get_email_info(mail_task_id, dops_db_config) + print("邮件任务信息:", mail_task_result) + if mail_task_result == None: + print(f'未找到邮件任务:{mail_task_id}') + exit() + mail_task = mail_task_result[0] + view_name = mail_task['table_name'] + send_to = mail_task['send_to'] + title = mail_task['title'] + content = mail_task['content'] + where_condition = mail_task['condition'] + output_file = mail_task['file_name'] + output_path = '/data/dops/app/dp-mail/tempFile' + + # 获取当前时间 + now = datetime.datetime.now() + timestampid = now.strftime("%Y%m%d%H%M%S") + current_time = now.strftime("%Y-%m-%d %H:%M:%S") + random_part = '{:04d}'.format(random.randint(0, 9999)) + # 组合时间戳和随机数 + mail_id = f"{timestampid}{random_part}" + title = title.replace('{date}', current_time) + + file_dict_path = os.path.join(output_path, mail_id) + + export_view_to_csv(view_name, output_file, where_condition, mpp_db_config, file_dict_path) + # export_view_to_excel(view_name, output_file, where_condition, mpp_db_config, file_dict_path) + insert_mail_record(send_from,send_to, title, content, dops_db_config, mail_id, attachment=output_file) +def insert_mail_record(send_from, send_to, subject, mail_content, db_config, mail_id, **kwargs): + + # 从db_config创建数据库URI + db_uri = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}" + + # 创建SQLAlchemy引擎 + engine = create_engine(db_uri) + + # 默认值 + send_cc = kwargs.get('send_cc', None) + send_bcc = kwargs.get('send_bcc', None) + attachment = kwargs.get('attachment', None) + send_status = kwargs.get('send_status', '0') + src_system = kwargs.get('src_system', None) + create_tm = kwargs.get('create_tm', datetime.datetime.now()) + create_person = kwargs.get('create_person', None) + upt_tm = kwargs.get('upt_tm', datetime.datetime.now()) + upt_person = kwargs.get('upt_person', None) + remark = kwargs.get('remark', None) + + # 创建一个新的邮件队列记录 + new_record = { + 'mail_id': mail_id, + 'send_from': send_from, + 'send_to': send_to, + 'send_cc': send_cc, + 'send_bcc': send_bcc, + 'subject': subject, + 'mail_content': mail_content, + 'attachment': attachment, + 'send_status': send_status, + 'src_system': src_system, + 'create_tm': create_tm, + 'create_person': create_person, + 'upt_tm': upt_tm, + 'upt_person': upt_person, + 'remark': remark, + } + print(f'插入邮件队列记录:{new_record}') + + # 使用with语句确保Connection在操作完成后被关闭 + with engine.connect() as connection: + try: + # 插入记录 + insert_stmt = text(""" + INSERT INTO dp_email.mail_queue ( + id, send_from, send_to, send_cc, send_bcc, subject, + mail_content, attachment, send_status, src_system, + create_tm, create_person, upt_tm, upt_person, remark + ) VALUES ( + :mail_id, :send_from, :send_to, :send_cc, :send_bcc, :subject, + :mail_content, :attachment, :send_status, :src_system, + :create_tm, :create_person, :upt_tm, :upt_person, :remark + ) + """) + + connection.execute(insert_stmt, new_record) + + except Exception as e: + raise e + + +def get_email_info(mail_task_id, dops_db_config): + + print(f'获取id为:{mail_task_id} 邮件任务信息') + # 数据库连接参数 + db_params = dops_db_config + + # 连接数据库 + conn = psycopg2.connect(**db_params) + + # 创建游标 + cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + + # 执行查询语句 + query = """ + SELECT + mail_task_id + , send_to + , title + , content + , table_name + , condition + , file_name + , valid_ind + , sort + FROM dp_custom.s_v_email_distribute + WHERE mail_task_id = %s and valid_ind='1'; + """ + cur.execute(query, (mail_task_id,)) + + # 获取查询结果 + result = cur.fetchall() + + # 关闭游标和连接 + cur.close() + conn.close() + + return result + +# 使用函数 +# mail_task_id = "your_mail_task_id" +# info = get_email_info(mail_task_id) +# print(info) + +if __name__ == "__main__": + # 以下内容根据各环境不同,手动配置 + mpp_db_config = { + 'dbname': os.getenv('DATABASENAME'), + 'user': os.getenv('USERNAME'), + 'password': os.getenv('PGPASSWORD'), + 'host': os.getenv('DBIP'), + 'port': os.getenv('PORT') + } + + dops_db_config = { + 'dbname': 'dataops_db', + 'user': os.getenv('USERNAME'), + 'password': os.getenv('PGPASSWORD'), + 'host': os.getenv('DBIP'), + 'port': os.getenv('PORT') + } + + send_from = 'tek_newsletter@163.com' + + # 创建命令行参数解析器 + parser = argparse.ArgumentParser(description="Export PostgreSQL Data to Excel") + parser.add_argument("mail_task_id", help="ID of the Mail Task to export") + args = parser.parse_args() + + # 调用main函数,传递命令行参数 + main(args.mail_task_id, dops_db_config, mpp_db_config, send_from) \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_ticket/天润Smart-ccc工单数据/wf_dag_smart_ccc_ticket.py b/dev/workflow/TK_Cust/smart_ccc_ticket/天润Smart-ccc工单数据/wf_dag_smart_ccc_ticket.py index e470a10..2eb5a68 100644 --- a/dev/workflow/TK_Cust/smart_ccc_ticket/天润Smart-ccc工单数据/wf_dag_smart_ccc_ticket.py +++ b/dev/workflow/TK_Cust/smart_ccc_ticket/天润Smart-ccc工单数据/wf_dag_smart_ccc_ticket.py @@ -75,25 +75,6 @@ depends_on_past=False, retries=3, dag=dag) -tickets_detail_feign = SSHOperator( -ssh_hook=sshHook, -task_id='tickets_detail_feign', -command='python3 /data/airflow/etl/API/tickets_detail_feign.py', -depends_on_past=False, -retries=3, -dag=dag) - -tickets_detail_load = SSHOperator( -ssh_hook=sshHook, -task_id='tickets_detail_load', -command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', -params={'my_param':"tickets_detail_load"}, -depends_on_past=False, -retries=3, -dag=dag) - -tickets_detail_feign >> tickets_detail_load - t01_ccc_work_ticket_info = SSHOperator( ssh_hook=sshHook, task_id='t01_ccc_work_ticket_info', @@ -110,10 +91,20 @@ params={'my_param':"f_ccc_work_ticket_integ_agi"}, depends_on_past=False, retries=3, dag=dag) + +uds_dynamic_data_export = SSHOperator( +ssh_hook=sshHook, +task_id='uds_dynamic_data_export', +command='python /data/airflow/etl/EXP/dynamic_data_export.py', +params={'my_param':"uds_dynamic_data_export"}, +depends_on_past=False, +retries=3, +dag=dag) + tickets_list_load >> tr_ticket_list_4585 tickets_list_load >> tr_ticket_detail_4125 tr_ticket_list_4585 >> t01_ccc_work_ticket_info tr_ticket_detail_4125 >> t01_ccc_work_ticket_info t01_ccc_work_ticket_info >> f_ccc_work_ticket_integ -f_ccc_work_ticket_integ >> task_failed -tickets_detail_load >> task_failed +f_ccc_work_ticket_integ >> uds_dynamic_data_export +uds_dynamic_data_export >> task_failed