add workflow CRM标签信息,dev
This commit is contained in:
parent
13c9fb9207
commit
2591c018e4
|
@ -0,0 +1,70 @@
|
||||||
|
#!/usr/bin/python
|
||||||
|
# -*- encoding=utf-8 -*-
|
||||||
|
from airflow import DAG
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from airflow.contrib.hooks.ssh_hook import SSHHook
|
||||||
|
from airflow.contrib.operators.ssh_operator import SSHOperator
|
||||||
|
from airflow.sensors.external_task_sensor import ExternalTaskSensor
|
||||||
|
import json
|
||||||
|
|
||||||
|
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
||||||
|
from airflow.operators.email_operator import EmailOperator
|
||||||
|
from airflow.utils.trigger_rule import TriggerRule
|
||||||
|
|
||||||
|
|
||||||
|
sshHook = SSHHook(ssh_conn_id ='ssh_air')
|
||||||
|
default_args = {
|
||||||
|
'owner': 'info@idgvalue.com',
|
||||||
|
'email_on_failure': True,
|
||||||
|
'email_on_retry':True,
|
||||||
|
'start_date': datetime(2024, 1, 1),
|
||||||
|
'depends_on_past': False,
|
||||||
|
'retries': 6,
|
||||||
|
'retry_delay': timedelta(minutes=10),
|
||||||
|
}
|
||||||
|
|
||||||
|
dag = DAG('wf_dag_crm_tags', default_args=default_args,
|
||||||
|
schedule_interval="0 0-23/1 * * *",
|
||||||
|
catchup=False,
|
||||||
|
dagrun_timeout=timedelta(minutes=160),
|
||||||
|
max_active_runs=3)
|
||||||
|
|
||||||
|
task_failed = EmailOperator (
|
||||||
|
dag=dag,
|
||||||
|
trigger_rule=TriggerRule.ONE_FAILED,
|
||||||
|
task_id="task_failed",
|
||||||
|
to=["info@idgvalue.com"],
|
||||||
|
cc=[""],
|
||||||
|
subject="crm_tags_failed",
|
||||||
|
html_content='<h3>您好,crm_tags作业失败,请及时处理" </h3>')
|
||||||
|
|
||||||
|
huiju_tags_feign = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='huiju_tags_feign',
|
||||||
|
command='python3 /data/airflow/etl/API/huiju_tags_feign.py',
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
huiju_tags_load = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='huiju_tags_load',
|
||||||
|
command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}',
|
||||||
|
params={'my_param':"huiju_tags_load"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
huiju_tags_feign >> huiju_tags_load
|
||||||
|
|
||||||
|
crm_tags_1237 = SSHOperator(
|
||||||
|
ssh_hook=sshHook,
|
||||||
|
task_id='crm_tags_1237',
|
||||||
|
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
|
||||||
|
params={'my_param':"S98_S_crm_tags"},
|
||||||
|
depends_on_past=False,
|
||||||
|
retries=3,
|
||||||
|
dag=dag)
|
||||||
|
|
||||||
|
huiju_tags_load >> crm_tags_1237
|
||||||
|
crm_tags_1237 >> task_failed
|
|
@ -0,0 +1,46 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
delete from p10_sa.S98_S_crm_tags
|
||||||
|
;
|
||||||
|
insert into p10_sa.S98_S_crm_tags
|
||||||
|
( id
|
||||||
|
, name
|
||||||
|
, group_name
|
||||||
|
, type
|
||||||
|
, date_created
|
||||||
|
, last_updated
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
id
|
||||||
|
, name
|
||||||
|
, group_name
|
||||||
|
, type
|
||||||
|
, date_created
|
||||||
|
, last_updated
|
||||||
|
, etl_tx_dt
|
||||||
|
from p00_tal.S98_S_crm_tags
|
||||||
|
;
|
||||||
|
delete from p12_sfull.S98_S_crm_tags
|
||||||
|
;
|
||||||
|
;
|
||||||
|
insert into p12_sfull.S98_S_crm_tags
|
||||||
|
( id
|
||||||
|
, name
|
||||||
|
, group_name
|
||||||
|
, type
|
||||||
|
, date_created
|
||||||
|
, last_updated
|
||||||
|
, etl_tx_dt )
|
||||||
|
select
|
||||||
|
id
|
||||||
|
, name
|
||||||
|
, group_name
|
||||||
|
, type
|
||||||
|
, date_created
|
||||||
|
, last_updated
|
||||||
|
, etl_tx_dt
|
||||||
|
from p10_sa.S98_S_crm_tags
|
||||||
|
;
|
||||||
|
\q
|
|
@ -0,0 +1,18 @@
|
||||||
|
|
||||||
|
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_crm_tags (
|
||||||
|
id TEXT
|
||||||
|
, name TEXT
|
||||||
|
, group_name TEXT
|
||||||
|
, type TEXT
|
||||||
|
, date_created TEXT
|
||||||
|
, last_updated TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'crm_tags' );
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
|
||||||
|
create table if not exists p10_sa.S98_S_crm_tags (
|
||||||
|
id TEXT
|
||||||
|
, name TEXT
|
||||||
|
, group_name TEXT
|
||||||
|
, type TEXT
|
||||||
|
, date_created TEXT
|
||||||
|
, last_updated TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_crm_tags.id IS '标签对应Id';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_crm_tags.name IS '标签名';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_crm_tags.group_name IS '标签分组名';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_crm_tags.type IS '标签类型';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_crm_tags.date_created IS '创建时间';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_crm_tags.last_updated IS '上次更新时间';
|
||||||
|
COMMENT ON COLUMN p10_sa.S98_S_crm_tags.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p10_sa.S98_S_crm_tags IS '';
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
create table if not exists p12_sfull.S98_S_crm_tags (
|
||||||
|
id TEXT
|
||||||
|
, name TEXT
|
||||||
|
, group_name TEXT
|
||||||
|
, type TEXT
|
||||||
|
, date_created TEXT
|
||||||
|
, last_updated TEXT
|
||||||
|
, etl_tx_dt TIMESTAMP
|
||||||
|
) ;
|
||||||
|
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.id IS '标签对应Id';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.name IS '标签名';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.group_name IS '标签分组名';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.type IS '标签类型';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.date_created IS '创建时间';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.last_updated IS '上次更新时间';
|
||||||
|
COMMENT ON COLUMN p12_sfull.S98_S_crm_tags.etl_tx_dt IS '';
|
||||||
|
|
||||||
|
COMMENT ON TABLE p12_sfull.S98_S_crm_tags IS '';
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
# coding: utf-8
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import psycopg2
|
||||||
|
import uuid
|
||||||
|
import datetime
|
||||||
|
import time
|
||||||
|
import hashlib
|
||||||
|
import time
|
||||||
|
|
||||||
|
#荟聚
|
||||||
|
|
||||||
|
#全局变量,便于参数使用的预设值
|
||||||
|
current_date = datetime.date.today() # 获取当前日期
|
||||||
|
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
|
||||||
|
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
|
||||||
|
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
|
||||||
|
timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至今的秒数;
|
||||||
|
sign_version = 'v2' # 签名版本号,固定值v2
|
||||||
|
nonce = str(uuid.uuid4())
|
||||||
|
|
||||||
|
#获取签名令牌
|
||||||
|
def sign_data(email, open_api_token, timestamp, nonce, sign_version):
|
||||||
|
# 按照指定的格式拼接字符串
|
||||||
|
data_to_sign = f"{email}&{open_api_token}&{timestamp}&{nonce}&{sign_version}"
|
||||||
|
# 使用SHA256算法计算哈希值
|
||||||
|
sha256_hash = hashlib.sha256(data_to_sign.encode()).hexdigest()
|
||||||
|
return sha256_hash
|
||||||
|
|
||||||
|
#获取鉴权token
|
||||||
|
def get_token(url):
|
||||||
|
#请求鉴权接口
|
||||||
|
authRequest=requests.get(url)
|
||||||
|
#解析结果
|
||||||
|
if not authRequest: #若为空时,返回空
|
||||||
|
return
|
||||||
|
auth=json.loads(authRequest.text)
|
||||||
|
return auth
|
||||||
|
|
||||||
|
print('开始加载数据:huiju_tags:荟聚标签信息')
|
||||||
|
authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||||
|
|
||||||
|
print('开始请求令牌。')
|
||||||
|
#authRequest=requests.get(authUrl)
|
||||||
|
#auth=json.loads(authRequest.text)
|
||||||
|
auth = get_token(authUrl)
|
||||||
|
#循环判断auth是否为空,若为空,等待30s后重新请求
|
||||||
|
i = 0
|
||||||
|
while 'error' in auth and i < 60:
|
||||||
|
time.sleep(60)
|
||||||
|
auth = get_token(authUrl)
|
||||||
|
i = i + 1
|
||||||
|
print('开始请求数据总数。')
|
||||||
|
url='https://api.huiju.cool/v2/tags'
|
||||||
|
header={}
|
||||||
|
body={'access_token':auth['access_token'],}
|
||||||
|
dataReqL=requests.get(url,headers=header,params=body)
|
||||||
|
resL=json.loads(dataReqL.text)
|
||||||
|
# print(resL)
|
||||||
|
dataList=resL['data']
|
||||||
|
total=len(dataList)
|
||||||
|
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
||||||
|
host="172.17.0.8", port="5432")
|
||||||
|
print('数据库连接成功')
|
||||||
|
dataId=str(uuid.uuid4())
|
||||||
|
print('临时id:'+dataId)
|
||||||
|
json_object = json.dumps(dataList)
|
||||||
|
cur=conn.cursor()
|
||||||
|
sql="update data_api.api_data set is_loaded = '1' where api_id = 'a7cf7f4d-108d-410c-9308-e13dfc56';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')"
|
||||||
|
cur.execute(sql,[dataId,'a7cf7f4d-108d-410c-9308-e13dfc56',json_object,total])
|
||||||
|
conn.commit()
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
|
print('加载数据结束:huiju_tags:荟聚标签信息')
|
|
@ -0,0 +1,42 @@
|
||||||
|
/*******Main Section**************************************************************************/
|
||||||
|
\set ON_ERROR_STOP on
|
||||||
|
\set AUTOCOMMIT on
|
||||||
|
\timing on
|
||||||
|
|
||||||
|
DELETE FROM data_api.crm_tags;
|
||||||
|
|
||||||
|
insert into data_api.crm_tags (
|
||||||
|
id
|
||||||
|
, name
|
||||||
|
, group_name
|
||||||
|
, type
|
||||||
|
, date_created
|
||||||
|
, last_updated
|
||||||
|
,etl_tx_dt
|
||||||
|
)
|
||||||
|
select
|
||||||
|
case when trim(both from id)='' then null else id::text end id
|
||||||
|
, case when trim(both from name)='' then null else name::text end name
|
||||||
|
, case when trim(both from group_name)='' then null else group_name::text end group_name
|
||||||
|
, case when trim(both from type)='' then null else type::text end type
|
||||||
|
, case when trim(both from date_created)='' then null else date_created::text end date_created
|
||||||
|
, case when trim(both from last_updated)='' then null else last_updated::text end last_updated
|
||||||
|
,etl_tx_dt
|
||||||
|
from (
|
||||||
|
select
|
||||||
|
(json_array_elements(data::json)::json->>'id') id
|
||||||
|
, (json_array_elements(data::json)::json->>'name') name
|
||||||
|
, (json_array_elements(data::json)::json->>'groupName') group_name
|
||||||
|
, (json_array_elements(data::json)::json->>'type') type
|
||||||
|
, (json_array_elements(data::json)::json->>'dateCreated') date_created
|
||||||
|
, (json_array_elements(data::json)::json->>'lastUpdated') last_updated
|
||||||
|
,CURRENT_TIMESTAMP(0) etl_tx_dt
|
||||||
|
from (select * from data_api.api_data
|
||||||
|
WHERE api_id='a7cf7f4d-108d-410c-9308-e13dfc56' and is_loaded = '0' order by request_tm desc limit 1) p )p;
|
||||||
|
|
||||||
|
update data_api.api_data
|
||||||
|
set is_loaded = '1' ,
|
||||||
|
status = '1',
|
||||||
|
request_tm = current_timestamp(0)
|
||||||
|
where api_id='a7cf7f4d-108d-410c-9308-e13dfc56';
|
||||||
|
\q
|
Loading…
Reference in New Issue