diff --git a/dev/workflow/TK_Cust/smart_ccc_dynamic_email/dynamic_data_export/dynamic_data_export.py b/dev/workflow/TK_Cust/smart_ccc_dynamic_email/dynamic_data_export/dynamic_data_export.py new file mode 100644 index 0000000..1ce0cf4 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_dynamic_email/dynamic_data_export/dynamic_data_export.py @@ -0,0 +1,207 @@ +import os +import pandas as pd +from openpyxl import Workbook +import argparse +from sqlalchemy import create_engine +from sqlalchemy.exc import SQLAlchemyError +from psycopg2 import connect, extras +import datetime +import psycopg2 +from sqlalchemy import text +import random + +def export_view_to_csv(view_name, output_file, where_condition, db_config, output_path): + + + # 检查环境变量是否都已设置 + for key in db_config: + if db_config[key] is None: + raise ValueError(f"Environment variable {key} is not set.") + + # 构建数据库连接字符串 + db_uri = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}" + + # 创建SQLAlchemy引擎 + engine = create_engine(db_uri) + + if where_condition is None or where_condition.strip() == "": + where_condition = " WHERE 1=1 " + + # 构建 SQL 查询语句,选择视图的所有列 + query = f""" + SELECT * FROM {view_name} {where_condition}; + """ + + # 使用 pandas 读取 SQL 查询结果 + df = pd.read_sql_query(query, engine) + + # 指定保存 CSV 文件的完整路径 + file_path = os.path.join(output_path, output_file) + + # 检查output_path目录是否存在,如果不存在则创建 + if not os.path.exists(output_path): + os.makedirs(output_path) + + # 使用 pandas 的 to_csv 方法保存数据到 CSV 文件 + df.to_csv(file_path, index=False) +def main(mail_task_id, dops_db_config, mpp_db_config, send_from): + mail_task_result = get_email_info(mail_task_id, dops_db_config) + print("邮件任务信息:", mail_task_result) + if mail_task_result == None: + print(f'未找到邮件任务:{mail_task_id}') + exit() + mail_task = mail_task_result[0] + view_name = mail_task['table_name'] + send_to = mail_task['send_to'] + title = mail_task['title'] + content = mail_task['content'] + where_condition = mail_task['condition'] + output_file = mail_task['file_name'] + output_path = '/data/dops/app/dp-mail/tempFile' + + # 获取当前时间 + now = datetime.datetime.now() + timestampid = now.strftime("%Y%m%d%H%M%S") + current_time = now.strftime("%Y-%m-%d %H:%M:%S") + random_part = '{:04d}'.format(random.randint(0, 9999)) + # 组合时间戳和随机数 + mail_id = f"{timestampid}{random_part}" + title = title.replace('{date}', current_time) + + file_dict_path = os.path.join(output_path, mail_id) + + export_view_to_csv(view_name, output_file, where_condition, mpp_db_config, file_dict_path) + # export_view_to_excel(view_name, output_file, where_condition, mpp_db_config, file_dict_path) + insert_mail_record(send_from,send_to, title, content, dops_db_config, mail_id, attachment=output_file) +def insert_mail_record(send_from, send_to, subject, mail_content, db_config, mail_id, **kwargs): + + # 从db_config创建数据库URI + db_uri = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}" + + # 创建SQLAlchemy引擎 + engine = create_engine(db_uri) + + # 默认值 + send_cc = kwargs.get('send_cc', None) + send_bcc = kwargs.get('send_bcc', None) + attachment = kwargs.get('attachment', None) + send_status = kwargs.get('send_status', '0') + src_system = kwargs.get('src_system', None) + create_tm = kwargs.get('create_tm', datetime.datetime.now()) + create_person = kwargs.get('create_person', None) + upt_tm = kwargs.get('upt_tm', datetime.datetime.now()) + upt_person = kwargs.get('upt_person', None) + remark = kwargs.get('remark', None) + + # 创建一个新的邮件队列记录 + new_record = { + 'mail_id': mail_id, + 'send_from': send_from, + 'send_to': send_to, + 'send_cc': send_cc, + 'send_bcc': send_bcc, + 'subject': subject, + 'mail_content': mail_content, + 'attachment': attachment, + 'send_status': send_status, + 'src_system': src_system, + 'create_tm': create_tm, + 'create_person': create_person, + 'upt_tm': upt_tm, + 'upt_person': upt_person, + 'remark': remark, + } + print(f'插入邮件队列记录:{new_record}') + + # 使用with语句确保Connection在操作完成后被关闭 + with engine.connect() as connection: + try: + # 插入记录 + insert_stmt = text(""" + INSERT INTO dp_email.mail_queue ( + id, send_from, send_to, send_cc, send_bcc, subject, + mail_content, attachment, send_status, src_system, + create_tm, create_person, upt_tm, upt_person, remark + ) VALUES ( + :mail_id, :send_from, :send_to, :send_cc, :send_bcc, :subject, + :mail_content, :attachment, :send_status, :src_system, + :create_tm, :create_person, :upt_tm, :upt_person, :remark + ) + """) + + connection.execute(insert_stmt, new_record) + + except Exception as e: + raise e + + +def get_email_info(mail_task_id, dops_db_config): + + print(f'获取id为:{mail_task_id} 邮件任务信息') + # 数据库连接参数 + db_params = dops_db_config + + # 连接数据库 + conn = psycopg2.connect(**db_params) + + # 创建游标 + cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + + # 执行查询语句 + query = """ + SELECT + mail_task_id + , send_to + , title + , content + , table_name + , condition + , file_name + , valid_ind + , sort + FROM dp_custom.s_v_email_distribute + WHERE mail_task_id = %s and valid_ind='1'; + """ + cur.execute(query, (mail_task_id,)) + + # 获取查询结果 + result = cur.fetchall() + + # 关闭游标和连接 + cur.close() + conn.close() + + return result + +# 使用函数 +# mail_task_id = "your_mail_task_id" +# info = get_email_info(mail_task_id) +# print(info) + +if __name__ == "__main__": + # 以下内容根据各环境不同,手动配置 + mpp_db_config = { + 'dbname': os.getenv('DATABASENAME'), + 'user': os.getenv('USERNAME'), + 'password': os.getenv('PGPASSWORD'), + 'host': os.getenv('DBIP'), + 'port': os.getenv('PORT') + } + + dops_db_config = { + 'dbname': 'dataops_db', + 'user': os.getenv('USERNAME'), + 'password': os.getenv('PGPASSWORD'), + 'host': os.getenv('DBIP'), + 'port': os.getenv('PORT') + } + + send_from = 'tek_newsletter@163.com' + + # 创建命令行参数解析器 + parser = argparse.ArgumentParser(description="Export PostgreSQL Data to Excel") + parser.add_argument("mail_task_id", help="ID of the Mail Task to export") + args = parser.parse_args() + + # 调用main函数,传递命令行参数 + main(args.mail_task_id, dops_db_config, mpp_db_config, send_from) \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_dynamic_email/天润dynamic邮件同步/wf_dag_smart_ccc_dynamic_email.py b/dev/workflow/TK_Cust/smart_ccc_dynamic_email/天润dynamic邮件同步/wf_dag_smart_ccc_dynamic_email.py new file mode 100644 index 0000000..f18ce47 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_dynamic_email/天润dynamic邮件同步/wf_dag_smart_ccc_dynamic_email.py @@ -0,0 +1,64 @@ +#!/usr/bin/python +# -*- encoding=utf-8 -*- +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.contrib.operators.ssh_operator import SSHOperator +from airflow.sensors.external_task_sensor import ExternalTaskSensor +import json + +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.operators.email_operator import EmailOperator +from airflow.utils.trigger_rule import TriggerRule + + +sshHook = SSHHook(ssh_conn_id ='ssh_air') +default_args = { +'owner': 'tek_newsletter@163.com', +'email_on_failure': True, +'email_on_retry':True, +'start_date': datetime(2024, 1, 1), +'depends_on_past': False, +'retries': 6, +'retry_delay': timedelta(minutes=10), +} + +dag = DAG('wf_dag_smart_ccc_dynamic_email', default_args=default_args, +schedule_interval="0 8-18/2 * * *", +catchup=False, +dagrun_timeout=timedelta(minutes=160), +max_active_runs=3) + +task_failed = EmailOperator ( + dag=dag, + trigger_rule=TriggerRule.ONE_FAILED, + task_id="task_failed", + to=["tek_newsletter@163.com"], + cc=[""], + subject="smart_ccc_dynamic_email_failed", + html_content='

您好,smart_ccc_dynamic_email作业失败,请及时处理"

') + + +dysql_update_dynamic_email_data = SSHOperator( +ssh_hook=sshHook, +task_id='dysql_update_dynamic_email_data', +command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"dysql_update_dynamic_email_data"}, +depends_on_past=False, +retries=3, +dag=dag) + + + + +uds_dynamic_data_export = SSHOperator( +ssh_hook=sshHook, +task_id='uds_dynamic_data_export', +command='python /data/airflow/etl/EXP/dynamic_data_export.py 001 >>/data/airflow/logs/run_data_export.log', +params={'my_param':"uds_dynamic_data_export"}, +depends_on_past=False, +retries=3, +dag=dag) + +uds_dynamic_data_export >> dysql_update_dynamic_email_data +dysql_update_dynamic_email_data >> task_failed diff --git a/dev/workflow/TK_Cust/smart_ccc_dynamic_email/已发送邮件数据标志/Dag_Sql_update_dynamic_email_data.py b/dev/workflow/TK_Cust/smart_ccc_dynamic_email/已发送邮件数据标志/Dag_Sql_update_dynamic_email_data.py new file mode 100644 index 0000000..4c4e0ef --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_dynamic_email/已发送邮件数据标志/Dag_Sql_update_dynamic_email_data.py @@ -0,0 +1,51 @@ +#!/usr/bin/python +# -*- encoding=utf-8 -*- +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.contrib.operators.ssh_operator import SSHOperator +import json + +from airflow.operators.email_operator import EmailOperator +from airflow.utils.trigger_rule import TriggerRule + + + + +sshHook = SSHHook(ssh_conn_id ='ssh_85_air') +default_args = { +'owner': 'sewp_dev', +'email': [], +'start_date': datetime(2022, 9, 12), +'depends_on_past': False, +'retries': 6, +'retry_delay': timedelta(minutes=10), +} + + +dag = DAG('Dag_Sql_update_dynamic_email_data', default_args=default_args, +schedule_interval="${cronExp}", +catchup=False, +dagrun_timeout=timedelta(minutes=160), +max_active_runs=3) + + +Sql_update_dynamic_email_data = SSHOperator( +ssh_hook=sshHook, +task_id='Sql_update_dynamic_email_data', +command='python /data/airflow/etl/PDM/{{params.my_param}}.py {{ ds_nodash }} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"sql_update_dynamic_email_data"}, +depends_on_past=False, +retries=3, +dag=dag) + +task_failed = EmailOperator ( + dag=dag, + trigger_rule=TriggerRule.ONE_FAILED, + task_id="已发送邮件数据标志_failed", + to=["${etlEmail}"], + cc=[""], + subject="已发送邮件数据标志_failed", + html_content='

已发送邮件数据标志_failed"

') + +Sql_update_dynamic_email_data >> task_failed \ No newline at end of file diff --git a/dev/workflow/TK_Cust/smart_ccc_dynamic_email/已发送邮件数据标志/dysql_update_dynamic_email_data.sql b/dev/workflow/TK_Cust/smart_ccc_dynamic_email/已发送邮件数据标志/dysql_update_dynamic_email_data.sql new file mode 100644 index 0000000..c8c3f08 --- /dev/null +++ b/dev/workflow/TK_Cust/smart_ccc_dynamic_email/已发送邮件数据标志/dysql_update_dynamic_email_data.sql @@ -0,0 +1,11 @@ +/********************************************************************************************/ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +update p30_common.f_ccc_work_ticket_integ +set etl_batch_no ='1' +where suffix in (select suffix from p61_output.v_f_ccc_dynamics_template) + +\q \ No newline at end of file