diff --git a/dev/workflow/TK_Cust/crm_tags/CRM标签信息/wf_dag_crm_tags.py b/dev/workflow/TK_Cust/crm_tags/CRM标签信息/wf_dag_crm_tags.py new file mode 100644 index 0000000..44bd366 --- /dev/null +++ b/dev/workflow/TK_Cust/crm_tags/CRM标签信息/wf_dag_crm_tags.py @@ -0,0 +1,70 @@ +#!/usr/bin/python +# -*- encoding=utf-8 -*- +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.hooks.ssh_hook import SSHHook +from airflow.contrib.operators.ssh_operator import SSHOperator +from airflow.sensors.external_task_sensor import ExternalTaskSensor +import json + +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.operators.email_operator import EmailOperator +from airflow.utils.trigger_rule import TriggerRule + + +sshHook = SSHHook(ssh_conn_id ='ssh_air') +default_args = { +'owner': 'info@idgvalue.com', +'email_on_failure': True, +'email_on_retry':True, +'start_date': datetime(2024, 1, 1), +'depends_on_past': False, +'retries': 6, +'retry_delay': timedelta(minutes=10), +} + +dag = DAG('wf_dag_crm_tags', default_args=default_args, +schedule_interval="0 0-23/1 * * *", +catchup=False, +dagrun_timeout=timedelta(minutes=160), +max_active_runs=3) + +task_failed = EmailOperator ( + dag=dag, + trigger_rule=TriggerRule.ONE_FAILED, + task_id="task_failed", + to=["info@idgvalue.com"], + cc=[""], + subject="crm_tags_failed", + html_content='<h3>您好,crm_tags作业失败,请及时处理" </h3>') + +huiju_tags_feign = SSHOperator( +ssh_hook=sshHook, +task_id='huiju_tags_feign', +command='python3 /data/airflow/etl/API/huiju_tags_feign.py', +depends_on_past=False, +retries=3, +dag=dag) + +huiju_tags_load = SSHOperator( +ssh_hook=sshHook, +task_id='huiju_tags_load', +command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', +params={'my_param':"huiju_tags_load"}, +depends_on_past=False, +retries=3, +dag=dag) + +huiju_tags_feign >> huiju_tags_load + +crm_tags_1237 = SSHOperator( +ssh_hook=sshHook, +task_id='crm_tags_1237', +command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ', +params={'my_param':"S98_S_crm_tags"}, +depends_on_past=False, +retries=3, +dag=dag) + +huiju_tags_load >> crm_tags_1237 +crm_tags_1237 >> task_failed diff --git a/dev/workflow/TK_Cust/crm_tags/crm_tags/S98_S_crm_tags.sql b/dev/workflow/TK_Cust/crm_tags/crm_tags/S98_S_crm_tags.sql new file mode 100644 index 0000000..8171e57 --- /dev/null +++ b/dev/workflow/TK_Cust/crm_tags/crm_tags/S98_S_crm_tags.sql @@ -0,0 +1,46 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on +delete from p10_sa.S98_S_crm_tags +; +insert into p10_sa.S98_S_crm_tags +( id + , name + , group_name + , type + , date_created + , last_updated + , etl_tx_dt ) + select + id + , name + , group_name + , type + , date_created + , last_updated + , etl_tx_dt + from p00_tal.S98_S_crm_tags + ; + delete from p12_sfull.S98_S_crm_tags +; +; +insert into p12_sfull.S98_S_crm_tags +( id + , name + , group_name + , type + , date_created + , last_updated + , etl_tx_dt ) + select + id + , name + , group_name + , type + , date_created + , last_updated + , etl_tx_dt + from p10_sa.S98_S_crm_tags +; +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/crm_tags/crm_tags/sa_foreign_tables.sql b/dev/workflow/TK_Cust/crm_tags/crm_tags/sa_foreign_tables.sql new file mode 100644 index 0000000..8f24fba --- /dev/null +++ b/dev/workflow/TK_Cust/crm_tags/crm_tags/sa_foreign_tables.sql @@ -0,0 +1,18 @@ + +CREATE FOREIGN TABLE if not exists p00_tal.S98_S_crm_tags ( + id TEXT + , name TEXT + , group_name TEXT + , type TEXT + , date_created TEXT + , last_updated TEXT + , etl_tx_dt TIMESTAMP +) + + +SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'crm_tags' ); + + + + + diff --git a/dev/workflow/TK_Cust/crm_tags/crm_tags/sa_tables.sql b/dev/workflow/TK_Cust/crm_tags/crm_tags/sa_tables.sql new file mode 100644 index 0000000..e855d97 --- /dev/null +++ b/dev/workflow/TK_Cust/crm_tags/crm_tags/sa_tables.sql @@ -0,0 +1,43 @@ + +create table if not exists p10_sa.S98_S_crm_tags ( + id TEXT + , name TEXT + , group_name TEXT + , type TEXT + , date_created TEXT + , last_updated TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p10_sa.S98_S_crm_tags.id IS '标签对应Id'; + COMMENT ON COLUMN p10_sa.S98_S_crm_tags.name IS '标签名'; + COMMENT ON COLUMN p10_sa.S98_S_crm_tags.group_name IS '标签分组名'; + COMMENT ON COLUMN p10_sa.S98_S_crm_tags.type IS '标签类型'; + COMMENT ON COLUMN p10_sa.S98_S_crm_tags.date_created IS '创建时间'; + COMMENT ON COLUMN p10_sa.S98_S_crm_tags.last_updated IS '上次更新时间'; + COMMENT ON COLUMN p10_sa.S98_S_crm_tags.etl_tx_dt IS ''; + +COMMENT ON TABLE p10_sa.S98_S_crm_tags IS ''; + + + +create table if not exists p12_sfull.S98_S_crm_tags ( + id TEXT + , name TEXT + , group_name TEXT + , type TEXT + , date_created TEXT + , last_updated TEXT + , etl_tx_dt TIMESTAMP +) ; + + COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.id IS '标签对应Id'; + COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.name IS '标签名'; + COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.group_name IS '标签分组名'; + COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.type IS '标签类型'; + COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.date_created IS '创建时间'; + COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.last_updated IS '上次更新时间'; + COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.etl_tx_dt IS ''; + +COMMENT ON TABLE p12_sfull.S98_S_crm_tags IS ''; + diff --git a/dev/workflow/TK_Cust/crm_tags/荟聚标签信息/huiju_tags_feign.py b/dev/workflow/TK_Cust/crm_tags/荟聚标签信息/huiju_tags_feign.py new file mode 100644 index 0000000..338780c --- /dev/null +++ b/dev/workflow/TK_Cust/crm_tags/荟聚标签信息/huiju_tags_feign.py @@ -0,0 +1,74 @@ +# coding: utf-8 +import requests +import json +import psycopg2 +import uuid +import datetime +import time +import hashlib +import time + +#荟聚 + +#全局变量,便于参数使用的预设值 +current_date = datetime.date.today() # 获取当前日期 +previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期 +formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化 +formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化 +timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数; +sign_version = 'v2' # 签名版本号,固定值v2 +nonce = str(uuid.uuid4()) + +#获取签名令牌 +def sign_data(email, open_api_token, timestamp, nonce, sign_version): + # 按照指定的格式拼接字符串 + data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}" + # 使用SHA256算法计算哈希值 + sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest() + return sha256_hash + +#获取鉴权token +def get_token(url): + #请求鉴权接口 + authRequest=requests.get(url) + #解析结果 + if not authRequest: #若为空时,返回空 + return + auth=json.loads(authRequest.text) + return auth + +print('开始加载数据:huiju_tags:荟聚标签信息') +authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials' + +print('开始请求令牌。') +#authRequest=requests.get(authUrl) +#auth=json.loads(authRequest.text) +auth = get_token(authUrl) +#循环判断auth是否为空,若为空,等待30s后重新请求 +i = 0 +while 'error' in auth and i < 60: + time.sleep(60) + auth = get_token(authUrl) + i = i + 1 +print('开始请求数据总数。') +url='https://api.huiju.cool/v2/tags' +header={} +body={'access_token':auth['access_token'],} +dataReqL=requests.get(url,headers=header,params=body) +resL=json.loads(dataReqL.text) +# print(resL) +dataList=resL['data'] +total=len(dataList) +conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA", + host="172.17.0.8", port="5432") +print('数据库连接成功') +dataId=str(uuid.uuid4()) +print('临时id:'+dataId) +json_object = json.dumps(dataList) +cur=conn.cursor() +sql="update data_api.api_data set is_loaded = '1' where api_id = 'a7cf7f4d-108d-410c-9308-e13dfc56';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')" +cur.execute(sql,[dataId,'a7cf7f4d-108d-410c-9308-e13dfc56',json_object,total]) +conn.commit() +cur.close() +conn.close() +print('加载数据结束:huiju_tags:荟聚标签信息') \ No newline at end of file diff --git a/dev/workflow/TK_Cust/crm_tags/荟聚标签信息/huiju_tags_load.sql b/dev/workflow/TK_Cust/crm_tags/荟聚标签信息/huiju_tags_load.sql new file mode 100644 index 0000000..dc2b9c7 --- /dev/null +++ b/dev/workflow/TK_Cust/crm_tags/荟聚标签信息/huiju_tags_load.sql @@ -0,0 +1,42 @@ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +DELETE FROM data_api.crm_tags; + +insert into data_api.crm_tags ( + id + , name + , group_name + , type + , date_created + , last_updated + ,etl_tx_dt +) +select + case when trim(both from id)='' then null else id::text end id + , case when trim(both from name)='' then null else name::text end name + , case when trim(both from group_name)='' then null else group_name::text end group_name + , case when trim(both from type)='' then null else type::text end type + , case when trim(both from date_created)='' then null else date_created::text end date_created + , case when trim(both from last_updated)='' then null else last_updated::text end last_updated +,etl_tx_dt +from ( +select + (json_array_elements(data::json)::json->>'id') id + , (json_array_elements(data::json)::json->>'name') name + , (json_array_elements(data::json)::json->>'groupName') group_name + , (json_array_elements(data::json)::json->>'type') type + , (json_array_elements(data::json)::json->>'dateCreated') date_created + , (json_array_elements(data::json)::json->>'lastUpdated') last_updated + ,CURRENT_TIMESTAMP(0) etl_tx_dt +from (select * from data_api.api_data +WHERE api_id='a7cf7f4d-108d-410c-9308-e13dfc56' and is_loaded = '0' order by request_tm desc limit 1) p )p; + +update data_api.api_data +set is_loaded = '1' , + status = '1', + request_tm = current_timestamp(0) +where api_id='a7cf7f4d-108d-410c-9308-e13dfc56'; +\q \ No newline at end of file