add workflow SCRM标签信息,dev
This commit is contained in:
parent
a5bf88005a
commit
5c8dba0f9e
|
@ -19,8 +19,16 @@ timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至
|
||||||
|
|
||||||
auth_url='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
auth_url='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials'
|
||||||
auth = {}
|
auth = {}
|
||||||
sel_sql = 'select * from data_api.api_data '
|
identityType = 'cp_employee_tools_huiju_corp'
|
||||||
upt_sql = 'update data_api.api_data set flag=1 where id= %s'
|
sel_sql = "select * from p61_output.ccc_scrm_cust_label_info where sync_ind <> '2';"
|
||||||
|
upt_sql = "update p61_output.ccc_scrm_cust_label_info set sync_ind=%s, etl_proc_dt=current_timestamp(0) where cust_id = %s and scrm_label_id = %s"
|
||||||
|
ist_sql = "INSERT INTO p61_output.scrm_leads_external_id_mapping (leads_id, external_type, external_id, etl_proc_dt, etl_tx_dt) VALUES (%s, %s, %s,current_timestamp(0), current_date);"
|
||||||
|
|
||||||
|
database="daas_mpp"
|
||||||
|
user="dbuser_mpp"
|
||||||
|
password="T2zWCUdEdxd3"
|
||||||
|
host="172.17.0.8"
|
||||||
|
port="5432"
|
||||||
|
|
||||||
def formatted2_previous_hour(h):
|
def formatted2_previous_hour(h):
|
||||||
if h==0:
|
if h==0:
|
||||||
|
@ -43,37 +51,46 @@ def get_token(url):
|
||||||
i = i + 1
|
i = i + 1
|
||||||
return auth
|
return auth
|
||||||
|
|
||||||
def get_data_from_db(sql):
|
def get_data_from_db(cur, conn, sql):
|
||||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
# conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
|
||||||
host="172.17.0.8", port="5432")
|
# cur=conn.cursor()
|
||||||
cur=conn.cursor()
|
|
||||||
cur.execute(sql)
|
cur.execute(sql)
|
||||||
records = cur.fetchall()
|
records = cur.fetchall()
|
||||||
conn.commit()
|
conn.commit()
|
||||||
cur.close()
|
# cur.close()
|
||||||
conn.close()
|
# conn.close()
|
||||||
return records
|
return records
|
||||||
|
|
||||||
def update_db_flag(sql, data):
|
def update_db_flag(cur, conn, sql, status, cust_id, tag_id):
|
||||||
conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA",
|
# conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
|
||||||
host="172.17.0.8", port="5432")
|
# cur=conn.cursor()
|
||||||
cur=conn.cursor()
|
cur.execute(sql,[status,cust_id,tag_id])
|
||||||
cur.execute(sql)
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
cur.close()
|
# cur.close()
|
||||||
conn.close()
|
# conn.close()
|
||||||
|
|
||||||
|
def add_leads_id(cur, conn, sql, type, leads_id, extern_id):
|
||||||
|
try:
|
||||||
|
# conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
|
||||||
|
# cur=conn.cursor()
|
||||||
|
cur.execute(sql,[leads_id,type,extern_id])
|
||||||
|
conn.commit()
|
||||||
|
# cur.close()
|
||||||
|
# conn.close()
|
||||||
|
except Exception as e:
|
||||||
|
print(f'添加线索id异常,接口返回信息:{e}')
|
||||||
|
|
||||||
def request_custom_id(id):
|
def request_custom_id(id):
|
||||||
print('开始请求线索id')
|
print(f'开始请求线索id:{id}')
|
||||||
header={}
|
header={}
|
||||||
url='https://api.huiju.cool/v2/customerService/findCustomerByIdentity'
|
url='https://api.huiju.cool/v2/customerService/findCustomerByIdentity'
|
||||||
body={'access_token':auth['access_token'],'identityType':'cp_employee_tools_huiju_corp','identityValue':id}
|
body={'access_token':auth['access_token'],'identityType':identityType,'identityValue':id}
|
||||||
dataReqL=requests.get(url,headers=header,params=body)
|
dataReqL=requests.get(url,headers=header,params=body)
|
||||||
resL=json.loads(dataReqL.text)
|
resL=json.loads(dataReqL.text)
|
||||||
return resL
|
return resL
|
||||||
|
|
||||||
def request_update_tags(idStr, tagId ):
|
def request_update_tags(idStr, tagId ):
|
||||||
print('开始请求更新线索标签')
|
print(f'开始请求更新线索标签{idStr}|{tagId}')
|
||||||
header={}
|
header={}
|
||||||
access_token = auth['access_token']
|
access_token = auth['access_token']
|
||||||
url=f'https://api.huiju.cool/v2/customerTags/bulkAdd?access_token={access_token}'
|
url=f'https://api.huiju.cool/v2/customerTags/bulkAdd?access_token={access_token}'
|
||||||
|
@ -87,27 +104,37 @@ def request_update_tags(idStr, tagId ):
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
print(f'{formatted2_previous_hour(0)}开始更新标签信息')
|
print(f'{formatted2_previous_hour(0)}开始更新标签信息')
|
||||||
get_token(auth_url)
|
auth = get_token(auth_url)
|
||||||
records = []
|
records = []
|
||||||
records = get_data_from_db(sel_sql)
|
conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
|
||||||
|
cur=conn.cursor()
|
||||||
|
records = get_data_from_db(cur, conn, sel_sql)
|
||||||
if len(records) > 0:
|
if len(records) > 0:
|
||||||
print(f'此处需处理{len(records)}条标签信息')
|
print(f'此处需处理{len(records)}条标签信息')
|
||||||
for data in records:
|
for data in records:
|
||||||
id = data['id']
|
custId = data[0]
|
||||||
tagId = data['tagId']
|
id = data[1]
|
||||||
|
idStr = data[2]
|
||||||
|
tagId = data[5]
|
||||||
print(f'开始处理客户({id}:{tagId})标签信息')
|
print(f'开始处理客户({id}:{tagId})标签信息')
|
||||||
try:
|
try:
|
||||||
resL = request_custom_id(id)
|
f = False
|
||||||
if 'error' in resL and resL['error']['code'] == 400010:
|
s = '2'
|
||||||
get_token(auth_url)
|
if not idStr:
|
||||||
resL = request_custom_id(id)
|
resL = request_custom_id(id)
|
||||||
if 'error' in resL:
|
if 'error' in resL and resL['error']['code'] == 400010:
|
||||||
print(f'获取客户线索id异常,接口返回信息:{resL}')
|
get_token(auth_url)
|
||||||
continue
|
resL = request_custom_id(id)
|
||||||
idStr = resL['idStr']
|
if 'error' in resL:
|
||||||
if idStr == '':
|
print(f'获取客户线索id异常,接口返回信息:{resL}')
|
||||||
print(f'获取客户线索id为空,接口返回信息:{resL}')
|
continue
|
||||||
continue
|
idStr = resL['idStr']
|
||||||
|
if idStr == '':
|
||||||
|
print(f'获取客户线索id为空,接口返回信息:{resL}')
|
||||||
|
continue
|
||||||
|
f=True
|
||||||
|
if f:
|
||||||
|
add_leads_id(cur, conn, ist_sql,identityType, idStr, id)
|
||||||
resU = request_update_tags(idStr,tagId)
|
resU = request_update_tags(idStr,tagId)
|
||||||
if 'error' in resU and resU['error']['code'] == 400010:
|
if 'error' in resU and resU['error']['code'] == 400010:
|
||||||
get_token(auth_url)
|
get_token(auth_url)
|
||||||
|
@ -115,10 +142,11 @@ if __name__ == "__main__":
|
||||||
if 'error' in resU:
|
if 'error' in resU:
|
||||||
print(f'获取客户线索id异常,接口返回信息:{resU}')
|
print(f'获取客户线索id异常,接口返回信息:{resU}')
|
||||||
continue
|
continue
|
||||||
update_db_flag(upt_sql,data)
|
update_db_flag(cur, conn, upt_sql,s,custId,tagId)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f'处理客户({id}:{tagId})标签信息,异常信息:{e}')
|
print(f'处理客户({id}:{tagId})标签信息,异常信息:{e}')
|
||||||
else:
|
else:
|
||||||
print(f'记录数为0,此处无需处理标签信息')
|
print(f'记录数为0,此处无需处理标签信息')
|
||||||
|
cur.close()
|
||||||
|
conn.close()
|
||||||
print(f'{formatted2_previous_hour(0)}更新标签信息结束')
|
print(f'{formatted2_previous_hour(0)}更新标签信息结束')
|
||||||
|
|
|
@ -58,14 +58,14 @@ dag=dag)
|
||||||
huiju_tags_feign >> huiju_tags_load
|
huiju_tags_feign >> huiju_tags_load
|
||||||
|
|
||||||
|
|
||||||
# uds_crm_tag_update = SSHOperator(
|
uds_crm_tag_update = SSHOperator(
|
||||||
# ssh_hook=sshHook,
|
ssh_hook=sshHook,
|
||||||
# task_id='uds_crm_tag_update',
|
task_id='uds_crm_tag_update',
|
||||||
# command='python /data/airflow/etl/Python/crm_tag_update.py',
|
command='python /data/airflow/etl/Python/crm_tag_update.py',
|
||||||
# params={'my_param':"uds_crm_tag_update"},
|
params={'my_param':"uds_crm_tag_update"},
|
||||||
# depends_on_past=False,
|
depends_on_past=False,
|
||||||
# retries=3,
|
retries=3,
|
||||||
# dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
scrm_tags_5000 = SSHOperator(
|
scrm_tags_5000 = SSHOperator(
|
||||||
ssh_hook=sshHook,
|
ssh_hook=sshHook,
|
||||||
|
@ -77,7 +77,7 @@ retries=3,
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
huiju_tags_load >> scrm_tags_5000
|
huiju_tags_load >> scrm_tags_5000
|
||||||
scrm_tags_5000 >> task_failed
|
# scrm_tags_5000 >> task_failed
|
||||||
|
|
||||||
# scrm_tags_5000 >> uds_crm_tag_update
|
scrm_tags_5000 >> uds_crm_tag_update
|
||||||
# uds_crm_tag_update >> task_failed
|
uds_crm_tag_update >> task_failed
|
||||||
|
|
Loading…
Reference in New Issue