add workflow 天润dynamic邮件同步,dev
This commit is contained in:
parent
3f51f5577e
commit
6381d35c35
|
@ -0,0 +1,207 @@
|
|||
import os
|
||||
import pandas as pd
|
||||
from openpyxl import Workbook
|
||||
import argparse
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from psycopg2 import connect, extras
|
||||
import datetime
|
||||
import psycopg2
|
||||
from sqlalchemy import text
|
||||
import random
|
||||
|
||||
def export_view_to_csv(view_name, output_file, where_condition, db_config, output_path):
|
||||
|
||||
|
||||
# 检查环境变量是否都已设置
|
||||
for key in db_config:
|
||||
if db_config[key] is None:
|
||||
raise ValueError(f"Environment variable {key} is not set.")
|
||||
|
||||
# 构建数据库连接字符串
|
||||
db_uri = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
|
||||
|
||||
# 创建SQLAlchemy引擎
|
||||
engine = create_engine(db_uri)
|
||||
|
||||
if where_condition is None or where_condition.strip() == "":
|
||||
where_condition = " WHERE 1=1 "
|
||||
|
||||
# 构建 SQL 查询语句,选择视图的所有列
|
||||
query = f"""
|
||||
SELECT * FROM {view_name} {where_condition};
|
||||
"""
|
||||
|
||||
# 使用 pandas 读取 SQL 查询结果
|
||||
df = pd.read_sql_query(query, engine)
|
||||
|
||||
# 指定保存 CSV 文件的完整路径
|
||||
file_path = os.path.join(output_path, output_file)
|
||||
|
||||
# 检查output_path目录是否存在,如果不存在则创建
|
||||
if not os.path.exists(output_path):
|
||||
os.makedirs(output_path)
|
||||
|
||||
# 使用 pandas 的 to_csv 方法保存数据到 CSV 文件
|
||||
df.to_csv(file_path, index=False)
|
||||
def main(mail_task_id, dops_db_config, mpp_db_config, send_from):
|
||||
mail_task_result = get_email_info(mail_task_id, dops_db_config)
|
||||
print("邮件任务信息:", mail_task_result)
|
||||
if mail_task_result == None:
|
||||
print(f'未找到邮件任务:{mail_task_id}')
|
||||
exit()
|
||||
mail_task = mail_task_result[0]
|
||||
view_name = mail_task['table_name']
|
||||
send_to = mail_task['send_to']
|
||||
title = mail_task['title']
|
||||
content = mail_task['content']
|
||||
where_condition = mail_task['condition']
|
||||
output_file = mail_task['file_name']
|
||||
output_path = '/data/dops/app/dp-mail/tempFile'
|
||||
|
||||
# 获取当前时间
|
||||
now = datetime.datetime.now()
|
||||
timestampid = now.strftime("%Y%m%d%H%M%S")
|
||||
current_time = now.strftime("%Y-%m-%d %H:%M:%S")
|
||||
random_part = '{:04d}'.format(random.randint(0, 9999))
|
||||
# 组合时间戳和随机数
|
||||
mail_id = f"{timestampid}{random_part}"
|
||||
title = title.replace('{date}', current_time)
|
||||
|
||||
file_dict_path = os.path.join(output_path, mail_id)
|
||||
|
||||
export_view_to_csv(view_name, output_file, where_condition, mpp_db_config, file_dict_path)
|
||||
# export_view_to_excel(view_name, output_file, where_condition, mpp_db_config, file_dict_path)
|
||||
insert_mail_record(send_from,send_to, title, content, dops_db_config, mail_id, attachment=output_file)
|
||||
def insert_mail_record(send_from, send_to, subject, mail_content, db_config, mail_id, **kwargs):
|
||||
|
||||
# 从db_config创建数据库URI
|
||||
db_uri = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
|
||||
|
||||
# 创建SQLAlchemy引擎
|
||||
engine = create_engine(db_uri)
|
||||
|
||||
# 默认值
|
||||
send_cc = kwargs.get('send_cc', None)
|
||||
send_bcc = kwargs.get('send_bcc', None)
|
||||
attachment = kwargs.get('attachment', None)
|
||||
send_status = kwargs.get('send_status', '0')
|
||||
src_system = kwargs.get('src_system', None)
|
||||
create_tm = kwargs.get('create_tm', datetime.datetime.now())
|
||||
create_person = kwargs.get('create_person', None)
|
||||
upt_tm = kwargs.get('upt_tm', datetime.datetime.now())
|
||||
upt_person = kwargs.get('upt_person', None)
|
||||
remark = kwargs.get('remark', None)
|
||||
|
||||
# 创建一个新的邮件队列记录
|
||||
new_record = {
|
||||
'mail_id': mail_id,
|
||||
'send_from': send_from,
|
||||
'send_to': send_to,
|
||||
'send_cc': send_cc,
|
||||
'send_bcc': send_bcc,
|
||||
'subject': subject,
|
||||
'mail_content': mail_content,
|
||||
'attachment': attachment,
|
||||
'send_status': send_status,
|
||||
'src_system': src_system,
|
||||
'create_tm': create_tm,
|
||||
'create_person': create_person,
|
||||
'upt_tm': upt_tm,
|
||||
'upt_person': upt_person,
|
||||
'remark': remark,
|
||||
}
|
||||
print(f'插入邮件队列记录:{new_record}')
|
||||
|
||||
# 使用with语句确保Connection在操作完成后被关闭
|
||||
with engine.connect() as connection:
|
||||
try:
|
||||
# 插入记录
|
||||
insert_stmt = text("""
|
||||
INSERT INTO dp_email.mail_queue (
|
||||
id, send_from, send_to, send_cc, send_bcc, subject,
|
||||
mail_content, attachment, send_status, src_system,
|
||||
create_tm, create_person, upt_tm, upt_person, remark
|
||||
) VALUES (
|
||||
:mail_id, :send_from, :send_to, :send_cc, :send_bcc, :subject,
|
||||
:mail_content, :attachment, :send_status, :src_system,
|
||||
:create_tm, :create_person, :upt_tm, :upt_person, :remark
|
||||
)
|
||||
""")
|
||||
|
||||
connection.execute(insert_stmt, new_record)
|
||||
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
|
||||
def get_email_info(mail_task_id, dops_db_config):
|
||||
|
||||
print(f'获取id为:{mail_task_id} 邮件任务信息')
|
||||
# 数据库连接参数
|
||||
db_params = dops_db_config
|
||||
|
||||
# 连接数据库
|
||||
conn = psycopg2.connect(**db_params)
|
||||
|
||||
# 创建游标
|
||||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
|
||||
# 执行查询语句
|
||||
query = """
|
||||
SELECT
|
||||
mail_task_id
|
||||
, send_to
|
||||
, title
|
||||
, content
|
||||
, table_name
|
||||
, condition
|
||||
, file_name
|
||||
, valid_ind
|
||||
, sort
|
||||
FROM dp_custom.s_v_email_distribute
|
||||
WHERE mail_task_id = %s and valid_ind='1';
|
||||
"""
|
||||
cur.execute(query, (mail_task_id,))
|
||||
|
||||
# 获取查询结果
|
||||
result = cur.fetchall()
|
||||
|
||||
# 关闭游标和连接
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
return result
|
||||
|
||||
# 使用函数
|
||||
# mail_task_id = "your_mail_task_id"
|
||||
# info = get_email_info(mail_task_id)
|
||||
# print(info)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 以下内容根据各环境不同,手动配置
|
||||
mpp_db_config = {
|
||||
'dbname': os.getenv('DATABASENAME'),
|
||||
'user': os.getenv('USERNAME'),
|
||||
'password': os.getenv('PGPASSWORD'),
|
||||
'host': os.getenv('DBIP'),
|
||||
'port': os.getenv('PORT')
|
||||
}
|
||||
|
||||
dops_db_config = {
|
||||
'dbname': 'dataops_db',
|
||||
'user': os.getenv('USERNAME'),
|
||||
'password': os.getenv('PGPASSWORD'),
|
||||
'host': os.getenv('DBIP'),
|
||||
'port': os.getenv('PORT')
|
||||
}
|
||||
|
||||
send_from = 'tek_newsletter@163.com'
|
||||
|
||||
# 创建命令行参数解析器
|
||||
parser = argparse.ArgumentParser(description="Export PostgreSQL Data to Excel")
|
||||
parser.add_argument("mail_task_id", help="ID of the Mail Task to export")
|
||||
args = parser.parse_args()
|
||||
|
||||
# 调用main函数,传递命令行参数
|
||||
main(args.mail_task_id, dops_db_config, mpp_db_config, send_from)
|
|
@ -0,0 +1,64 @@
|
|||
#!/usr/bin/python
|
||||
# -*- encoding=utf-8 -*-
|
||||
from airflow import DAG
|
||||
from datetime import datetime, timedelta
|
||||
from airflow.contrib.hooks.ssh_hook import SSHHook
|
||||
from airflow.contrib.operators.ssh_operator import SSHOperator
|
||||
from airflow.sensors.external_task_sensor import ExternalTaskSensor
|
||||
import json
|
||||
|
||||
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
||||
from airflow.operators.email_operator import EmailOperator
|
||||
from airflow.utils.trigger_rule import TriggerRule
|
||||
|
||||
|
||||
sshHook = SSHHook(ssh_conn_id ='ssh_air')
|
||||
default_args = {
|
||||
'owner': 'tek_newsletter@163.com',
|
||||
'email_on_failure': True,
|
||||
'email_on_retry':True,
|
||||
'start_date': datetime(2024, 1, 1),
|
||||
'depends_on_past': False,
|
||||
'retries': 6,
|
||||
'retry_delay': timedelta(minutes=10),
|
||||
}
|
||||
|
||||
dag = DAG('wf_dag_smart_ccc_dynamic_email', default_args=default_args,
|
||||
schedule_interval="0 8-18/2 * * *",
|
||||
catchup=False,
|
||||
dagrun_timeout=timedelta(minutes=160),
|
||||
max_active_runs=3)
|
||||
|
||||
task_failed = EmailOperator (
|
||||
dag=dag,
|
||||
trigger_rule=TriggerRule.ONE_FAILED,
|
||||
task_id="task_failed",
|
||||
to=["tek_newsletter@163.com"],
|
||||
cc=[""],
|
||||
subject="smart_ccc_dynamic_email_failed",
|
||||
html_content='<h3>您好,smart_ccc_dynamic_email作业失败,请及时处理" </h3>')
|
||||
|
||||
|
||||
dysql_update_dynamic_email_data = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='dysql_update_dynamic_email_data',
|
||||
command='/data/airflow/etl/COM/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
|
||||
params={'my_param':"dysql_update_dynamic_email_data"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
|
||||
|
||||
|
||||
uds_dynamic_data_export = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='uds_dynamic_data_export',
|
||||
command='python /data/airflow/etl/EXP/dynamic_data_export.py 001 >>/data/airflow/logs/run_data_export.log',
|
||||
params={'my_param':"uds_dynamic_data_export"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
uds_dynamic_data_export >> dysql_update_dynamic_email_data
|
||||
dysql_update_dynamic_email_data >> task_failed
|
|
@ -0,0 +1,51 @@
|
|||
#!/usr/bin/python
|
||||
# -*- encoding=utf-8 -*-
|
||||
from airflow import DAG
|
||||
from datetime import datetime, timedelta
|
||||
from airflow.contrib.hooks.ssh_hook import SSHHook
|
||||
from airflow.contrib.operators.ssh_operator import SSHOperator
|
||||
import json
|
||||
|
||||
from airflow.operators.email_operator import EmailOperator
|
||||
from airflow.utils.trigger_rule import TriggerRule
|
||||
|
||||
|
||||
|
||||
|
||||
sshHook = SSHHook(ssh_conn_id ='ssh_85_air')
|
||||
default_args = {
|
||||
'owner': 'sewp_dev',
|
||||
'email': [],
|
||||
'start_date': datetime(2022, 9, 12),
|
||||
'depends_on_past': False,
|
||||
'retries': 6,
|
||||
'retry_delay': timedelta(minutes=10),
|
||||
}
|
||||
|
||||
|
||||
dag = DAG('Dag_Sql_update_dynamic_email_data', default_args=default_args,
|
||||
schedule_interval="${cronExp}",
|
||||
catchup=False,
|
||||
dagrun_timeout=timedelta(minutes=160),
|
||||
max_active_runs=3)
|
||||
|
||||
|
||||
Sql_update_dynamic_email_data = SSHOperator(
|
||||
ssh_hook=sshHook,
|
||||
task_id='Sql_update_dynamic_email_data',
|
||||
command='python /data/airflow/etl/PDM/{{params.my_param}}.py {{ ds_nodash }} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
|
||||
params={'my_param':"sql_update_dynamic_email_data"},
|
||||
depends_on_past=False,
|
||||
retries=3,
|
||||
dag=dag)
|
||||
|
||||
task_failed = EmailOperator (
|
||||
dag=dag,
|
||||
trigger_rule=TriggerRule.ONE_FAILED,
|
||||
task_id="已发送邮件数据标志_failed",
|
||||
to=["${etlEmail}"],
|
||||
cc=[""],
|
||||
subject="已发送邮件数据标志_failed",
|
||||
html_content='<h3>已发送邮件数据标志_failed" </h3>')
|
||||
|
||||
Sql_update_dynamic_email_data >> task_failed
|
|
@ -0,0 +1,11 @@
|
|||
/********************************************************************************************/
|
||||
/*******Main Section**************************************************************************/
|
||||
\set ON_ERROR_STOP on
|
||||
\set AUTOCOMMIT on
|
||||
\timing on
|
||||
|
||||
update p30_common.f_ccc_work_ticket_integ
|
||||
set etl_batch_no ='1'
|
||||
where suffix in (select suffix from p61_output.v_f_ccc_dynamics_template)
|
||||
|
||||
\q
|
Loading…
Reference in New Issue