add workflow 天润Smart-ccc工单数据,dev

This commit is contained in:
root 2024-07-31 14:19:59 +08:00
parent 41c8141dde
commit 535bac12ff
2 changed files with 219 additions and 21 deletions

View File

@ -0,0 +1,207 @@
import os
import pandas as pd
from openpyxl import Workbook
import argparse
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from psycopg2 import connect, extras
import datetime
import psycopg2
from sqlalchemy import text
import random
def export_view_to_csv(view_name, output_file, where_condition, db_config, output_path):
# 检查环境变量是否都已设置
for key in db_config:
if db_config[key] is None:
raise ValueError(f"Environment variable {key} is not set.")
# 构建数据库连接字符串
db_uri = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
# 创建SQLAlchemy引擎
engine = create_engine(db_uri)
if where_condition is None or where_condition.strip() == "":
where_condition = " WHERE 1=1 "
# 构建 SQL 查询语句,选择视图的所有列
query = f"""
SELECT * FROM {view_name} {where_condition};
"""
# 使用 pandas 读取 SQL 查询结果
df = pd.read_sql_query(query, engine)
# 指定保存 CSV 文件的完整路径
file_path = os.path.join(output_path, output_file)
# 检查output_path目录是否存在如果不存在则创建
if not os.path.exists(output_path):
os.makedirs(output_path)
# 使用 pandas 的 to_csv 方法保存数据到 CSV 文件
df.to_csv(file_path, index=False)
def main(mail_task_id, dops_db_config, mpp_db_config, send_from):
mail_task_result = get_email_info(mail_task_id, dops_db_config)
print("邮件任务信息:", mail_task_result)
if mail_task_result == None:
print(f'未找到邮件任务:{mail_task_id}')
exit()
mail_task = mail_task_result[0]
view_name = mail_task['table_name']
send_to = mail_task['send_to']
title = mail_task['title']
content = mail_task['content']
where_condition = mail_task['condition']
output_file = mail_task['file_name']
output_path = '/data/dops/app/dp-mail/tempFile'
# 获取当前时间
now = datetime.datetime.now()
timestampid = now.strftime("%Y%m%d%H%M%S")
current_time = now.strftime("%Y-%m-%d %H:%M:%S")
random_part = '{:04d}'.format(random.randint(0, 9999))
# 组合时间戳和随机数
mail_id = f"{timestampid}{random_part}"
title = title.replace('{date}', current_time)
file_dict_path = os.path.join(output_path, mail_id)
export_view_to_csv(view_name, output_file, where_condition, mpp_db_config, file_dict_path)
# export_view_to_excel(view_name, output_file, where_condition, mpp_db_config, file_dict_path)
insert_mail_record(send_from,send_to, title, content, dops_db_config, mail_id, attachment=output_file)
def insert_mail_record(send_from, send_to, subject, mail_content, db_config, mail_id, **kwargs):
# 从db_config创建数据库URI
db_uri = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['dbname']}"
# 创建SQLAlchemy引擎
engine = create_engine(db_uri)
# 默认值
send_cc = kwargs.get('send_cc', None)
send_bcc = kwargs.get('send_bcc', None)
attachment = kwargs.get('attachment', None)
send_status = kwargs.get('send_status', '0')
src_system = kwargs.get('src_system', None)
create_tm = kwargs.get('create_tm', datetime.datetime.now())
create_person = kwargs.get('create_person', None)
upt_tm = kwargs.get('upt_tm', datetime.datetime.now())
upt_person = kwargs.get('upt_person', None)
remark = kwargs.get('remark', None)
# 创建一个新的邮件队列记录
new_record = {
'mail_id': mail_id,
'send_from': send_from,
'send_to': send_to,
'send_cc': send_cc,
'send_bcc': send_bcc,
'subject': subject,
'mail_content': mail_content,
'attachment': attachment,
'send_status': send_status,
'src_system': src_system,
'create_tm': create_tm,
'create_person': create_person,
'upt_tm': upt_tm,
'upt_person': upt_person,
'remark': remark,
}
print(f'插入邮件队列记录:{new_record}')
# 使用with语句确保Connection在操作完成后被关闭
with engine.connect() as connection:
try:
# 插入记录
insert_stmt = text("""
INSERT INTO dp_email.mail_queue (
id, send_from, send_to, send_cc, send_bcc, subject,
mail_content, attachment, send_status, src_system,
create_tm, create_person, upt_tm, upt_person, remark
) VALUES (
:mail_id, :send_from, :send_to, :send_cc, :send_bcc, :subject,
:mail_content, :attachment, :send_status, :src_system,
:create_tm, :create_person, :upt_tm, :upt_person, :remark
)
""")
connection.execute(insert_stmt, new_record)
except Exception as e:
raise e
def get_email_info(mail_task_id, dops_db_config):
print(f'获取id为{mail_task_id} 邮件任务信息')
# 数据库连接参数
db_params = dops_db_config
# 连接数据库
conn = psycopg2.connect(**db_params)
# 创建游标
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# 执行查询语句
query = """
SELECT
mail_task_id
, send_to
, title
, content
, table_name
, condition
, file_name
, valid_ind
, sort
FROM dp_custom.s_v_email_distribute
WHERE mail_task_id = %s and valid_ind='1';
"""
cur.execute(query, (mail_task_id,))
# 获取查询结果
result = cur.fetchall()
# 关闭游标和连接
cur.close()
conn.close()
return result
# 使用函数
# mail_task_id = "your_mail_task_id"
# info = get_email_info(mail_task_id)
# print(info)
if __name__ == "__main__":
# 以下内容根据各环境不同,手动配置
mpp_db_config = {
'dbname': os.getenv('DATABASENAME'),
'user': os.getenv('USERNAME'),
'password': os.getenv('PGPASSWORD'),
'host': os.getenv('DBIP'),
'port': os.getenv('PORT')
}
dops_db_config = {
'dbname': 'dataops_db',
'user': os.getenv('USERNAME'),
'password': os.getenv('PGPASSWORD'),
'host': os.getenv('DBIP'),
'port': os.getenv('PORT')
}
send_from = 'tek_newsletter@163.com'
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description="Export PostgreSQL Data to Excel")
parser.add_argument("mail_task_id", help="ID of the Mail Task to export")
args = parser.parse_args()
# 调用main函数传递命令行参数
main(args.mail_task_id, dops_db_config, mpp_db_config, send_from)

View File

@ -75,25 +75,6 @@ depends_on_past=False,
retries=3,
dag=dag)
tickets_detail_feign = SSHOperator(
ssh_hook=sshHook,
task_id='tickets_detail_feign',
command='python3 /data/airflow/etl/API/tickets_detail_feign.py',
depends_on_past=False,
retries=3,
dag=dag)
tickets_detail_load = SSHOperator(
ssh_hook=sshHook,
task_id='tickets_detail_load',
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
params={'my_param':"tickets_detail_load"},
depends_on_past=False,
retries=3,
dag=dag)
tickets_detail_feign >> tickets_detail_load
t01_ccc_work_ticket_info = SSHOperator(
ssh_hook=sshHook,
task_id='t01_ccc_work_ticket_info',
@ -110,10 +91,20 @@ params={'my_param':"f_ccc_work_ticket_integ_agi"},
depends_on_past=False,
retries=3,
dag=dag)
uds_dynamic_data_export = SSHOperator(
ssh_hook=sshHook,
task_id='uds_dynamic_data_export',
command='python /data/airflow/etl/EXP/dynamic_data_export.py',
params={'my_param':"uds_dynamic_data_export"},
depends_on_past=False,
retries=3,
dag=dag)
tickets_list_load >> tr_ticket_list_4585
tickets_list_load >> tr_ticket_detail_4125
tr_ticket_list_4585 >> t01_ccc_work_ticket_info
tr_ticket_detail_4125 >> t01_ccc_work_ticket_info
t01_ccc_work_ticket_info >> f_ccc_work_ticket_integ
f_ccc_work_ticket_integ >> task_failed
tickets_detail_load >> task_failed
f_ccc_work_ticket_integ >> uds_dynamic_data_export
uds_dynamic_data_export >> task_failed