add workflow CRM标签信息,dev

This commit is contained in:
root 2024-07-11 17:29:21 +08:00
parent b6c8dfd915
commit f92285a2a2
6 changed files with 250 additions and 9 deletions

View File

@ -57,14 +57,25 @@ dag=dag)
huiju_tags_feign >> huiju_tags_load
crm_tags_1237 = SSHOperator(
uds_crm_tag_update = SSHOperator(
ssh_hook=sshHook,
task_id='crm_tags_1237',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"S98_S_crm_tags"},
task_id='uds_crm_tag_update',
command='python /data/airflow/etl/Python/update_crm_tags.py',
params={'my_param':"uds_crm_tag_update"},
depends_on_past=False,
retries=3,
dag=dag)
huiju_tags_load >> crm_tags_1237
crm_tags_1237 >> task_failed
scrm_tags_5000 = SSHOperator(
ssh_hook=sshHook,
task_id='scrm_tags_5000',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} >>/data/airflow/logs/run_psql_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"S98_S_scrm_tags"},
depends_on_past=False,
retries=3,
dag=dag)
huiju_tags_load >> scrm_tags_5000
scrm_tags_5000 >> uds_crm_tag_update
uds_crm_tag_update >> task_failed

View File

@ -0,0 +1,124 @@
# coding: utf-8
import requests
import json
import psycopg2
import uuid
import datetime
import time
import hashlib
import time
#荟聚
#全局变量,便于参数使用的预设值
current_date = datetime.date.today() # 获取当前日期
previous_date = current_date - datetime.timedelta(days=1) # 获取前一天日期
formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取当前日期 - 标准化
formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ") # 获取前一天日期 - 标准化
timestamp = time.time() # 为Unix time即从"1970-01-01 00:00:00"至今的秒数;
auth_url='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
auth = {}
sel_sql = 'select * from data_api.api_data '
upt_sql = 'update data_api.api_data set flag=1 where id= %s'
def formatted2_previous_hour(h):
if h==0:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
start_of_current_hour = datetime.datetime.now().replace(minute=0, second=0, microsecond=0)
# 减去一个小时,得到前一个小时的开始时间
start_of_previous_hour = start_of_current_hour - datetime.timedelta(hours=h)
return start_of_previous_hour.strftime("%Y-%m-%d %H:%M:%S")
def get_token(url):
#请求鉴权接口
authRequest=requests.get(url)
#解析结果
if not authRequest: #若为空时,返回空
return get_token(url)
auth=json.loads(authRequest.text)
while 'error' in auth and i < 60:
time.sleep(60)
auth = get_token(url)
i = i + 1
return auth
def get_data_from_db(sql):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
cur=conn.cursor()
cur.execute(sql)
records = cur.fetchall()
conn.commit()
cur.close()
conn.close()
return records
def update_db_flag(sql, data):
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")
cur=conn.cursor()
cur.execute(sql)
conn.commit()
cur.close()
conn.close()
def request_custom_id(id):
print('开始请求线索id')
header={}
url='https://api.huiju.cool/v2/customerService/findCustomerByIdentity'
body={'access_token':auth['access_token'],'identityType':'cp_employee_tools_huiju_corp','identityValue':id}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
return resL
def request_update_tags(idStr, tagId ):
print('开始请求更新线索标签')
header={}
access_token = auth['access_token']
url=f'https://api.huiju.cool/v2/customerTags/bulkAdd?access_token={access_token}'
body=[{'tagId':tagId, 'customerIds':[idStr]}]
jsonData = json.dumps(body)
print(f'body: {jsonData}')
dataReqL=requests.post(url,headers=header,data=jsonData)
resL=json.loads(dataReqL.text)
return resL
if __name__ == "__main__":
print(f'{formatted2_previous_hour(0)}开始更新标签信息')
get_token(auth_url)
records = []
records = get_data_from_db(sel_sql)
if len(records) > 0:
print(f'此处需处理{len(records)}条标签信息')
for data in records:
id = data['id']
tagId = data['tagId']
print(f'开始处理客户({id}:{tagId})标签信息')
try:
resL = request_custom_id(id)
if 'error' in resL and resL['error']['code'] == 400010:
get_token(auth_url)
resL = request_custom_id(id)
if 'error' in resL:
print(f'获取客户线索id异常,接口返回信息:{resL}')
continue
idStr = resL['idStr']
if idStr == '':
print(f'获取客户线索id为空,接口返回信息:{resL}')
continue
resU = request_update_tags(idStr,tagId)
if 'error' in resU and resU['error']['code'] == 400010:
get_token(auth_url)
resU = request_update_tags(idStr,tagId)
if 'error' in resU:
print(f'获取客户线索id异常,接口返回信息:{resU}')
continue
update_db_flag(upt_sql,data)
except Exception as e:
print(f'处理客户({id}:{tagId})标签信息,异常信息:{e}')
else:
print(f'记录数为0此处无需处理标签信息')
print(f'{formatted2_previous_hour(0)}更新标签信息结束')

View File

@ -0,0 +1,46 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
delete from p10_sa.S98_S_scrm_tags
;
insert into p10_sa.S98_S_scrm_tags
( id
, name
, group_name
, type
, date_created
, last_updated
, etl_tx_dt )
select
id
, name
, group_name
, type
, date_created
, last_updated
, etl_tx_dt
from p00_tal.S98_S_scrm_tags
;
delete from p12_sfull.S98_S_scrm_tags
;
;
insert into p12_sfull.S98_S_scrm_tags
( id
, name
, group_name
, type
, date_created
, last_updated
, etl_tx_dt )
select
id
, name
, group_name
, type
, date_created
, last_updated
, etl_tx_dt
from p10_sa.S98_S_scrm_tags
;
\q

View File

@ -0,0 +1,18 @@
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_scrm_tags (
id TEXT
, name TEXT
, group_name TEXT
, type TEXT
, date_created TEXT
, last_updated TEXT
, etl_tx_dt TIMESTAMP
)
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'scrm_tags' );

View File

@ -0,0 +1,43 @@
create table if not exists p10_sa.S98_S_scrm_tags (
id TEXT
, name TEXT
, group_name TEXT
, type TEXT
, date_created TEXT
, last_updated TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.id IS '标签对应Id';
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.name IS '标签名';
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.group_name IS '标签分组名';
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.type IS '标签类型';
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.date_created IS '创建时间';
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.last_updated IS '上次更新时间';
COMMENT ON COLUMN p10_sa.S98_S_scrm_tags.etl_tx_dt IS '';
COMMENT ON TABLE p10_sa.S98_S_scrm_tags IS '';
create table if not exists p12_sfull.S98_S_scrm_tags (
id TEXT
, name TEXT
, group_name TEXT
, type TEXT
, date_created TEXT
, last_updated TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.id IS '标签对应Id';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.name IS '标签名';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.group_name IS '标签分组名';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.type IS '标签类型';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.date_created IS '创建时间';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.last_updated IS '上次更新时间';
COMMENT ON COLUMN p12_sfull.S98_S_scrm_tags.etl_tx_dt IS '';
COMMENT ON TABLE p12_sfull.S98_S_scrm_tags IS '';

View File

@ -53,12 +53,11 @@ while 'error' in auth and i < 60:
print('开始请求数据总数。')
url='https://api.huiju.cool/v2/tags'
header={}
body={'access_token':auth['access_token'],}
body={'access_token':auth['access_token'],'limit':'1000',}
dataReqL=requests.get(url,headers=header,params=body)
resL=json.loads(dataReqL.text)
# print(resL)
#dataList=resL['data']
dataList=resL
dataList=resL['data']
total=len(dataList)
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
host="172.17.0.8", port="5432")