add workflow 天润Smart-ccc客户数据,dev

This commit is contained in:
root 2024-08-01 16:31:45 +08:00
parent 0590572d07
commit 2fcc452226
1 changed files with 26 additions and 23 deletions

View File

@ -22,7 +22,9 @@ auth = {}
identityType = 'cp_employee_tools_huiju_corp' identityType = 'cp_employee_tools_huiju_corp'
sel_sql = "select * from p61_output.ccc_scrm_cust_label_info where sync_ind <> '2';" sel_sql = "select * from p61_output.ccc_scrm_cust_label_info where sync_ind <> '2';"
upt_sql = "update p61_output.ccc_scrm_cust_label_info set sync_ind=%s, etl_proc_dt=current_timestamp(0) where cust_id = %s and scrm_label_id = %s" upt_sql = "update p61_output.ccc_scrm_cust_label_info set sync_ind=%s, etl_proc_dt=current_timestamp(0) where cust_id = %s and scrm_label_id = %s"
ist_sql = "INSERT INTO p61_output.scrm_leads_external_id_mapping (leads_id, external_type, external_id, etl_proc_dt, etl_tx_dt) VALUES (%s, %s, %s,current_timestamp(0), current_date);" ist_sql = "INSERT INTO p61_output.scrm_leads_external_id_mapping (leads_id, external_type, external_id, etl_proc_dt, etl_tx_dt) VALUES (%s, %s, %s,current_timestamp(0), current_date) on CONFLICT (leads_id, external_type) DO NOTHING;"
# ist_sql = "delete from p61_output.scrm_leads_external_id_mapping where leads_id=%s and external_type = %s ; INSERT INTO p61_output.scrm_leads_external_id_mapping (leads_id, external_type, external_id, etl_proc_dt, etl_tx_dt) VALUES (%s, %s, %s,current_timestamp(0), current_date);"
database="daas_mpp" database="daas_mpp"
user="dbuser_mpp" user="dbuser_mpp"
@ -51,32 +53,32 @@ def get_token(url):
i = i + 1 i = i + 1
return auth return auth
def get_data_from_db(cur, conn, sql): def get_data_from_db(sql):
# conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
# cur=conn.cursor() cur=conn.cursor()
cur.execute(sql) cur.execute(sql)
records = cur.fetchall() records = cur.fetchall()
conn.commit() conn.commit()
# cur.close() cur.close()
# conn.close() conn.close()
return records return records
def update_db_flag(cur, conn, sql, status, cust_id, tag_id): def update_db_flag(sql, status, cust_id, tag_id):
# conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
# cur=conn.cursor() cur=conn.cursor()
cur.execute(sql,[status,cust_id,tag_id]) cur.execute(sql,[status,cust_id,tag_id])
conn.commit() conn.commit()
# cur.close() cur.close()
# conn.close() conn.close()
def add_leads_id(cur, conn, sql, type, leads_id, extern_id): def add_leads_id(sql, type, leads_id, extern_id):
try: try:
# conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
# cur=conn.cursor() cur=conn.cursor()
cur.execute(sql,[leads_id,type,extern_id]) cur.execute(sql,[leads_id,type,extern_id])
conn.commit() conn.commit()
# cur.close() cur.close()
# conn.close() conn.close()
except Exception as e: except Exception as e:
print(f'添加线索id异常,接口返回信息:{e}') print(f'添加线索id异常,接口返回信息:{e}')
@ -106,9 +108,9 @@ if __name__ == "__main__":
print(f'{formatted2_previous_hour(0)}开始更新标签信息') print(f'{formatted2_previous_hour(0)}开始更新标签信息')
auth = get_token(auth_url) auth = get_token(auth_url)
records = [] records = []
conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) # conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port)
cur=conn.cursor() # cur=conn.cursor()
records = get_data_from_db(cur, conn, sel_sql) records = get_data_from_db( sel_sql)
if len(records) > 0: if len(records) > 0:
print(f'此处需处理{len(records)}条标签信息') print(f'此处需处理{len(records)}条标签信息')
for data in records: for data in records:
@ -134,7 +136,10 @@ if __name__ == "__main__":
continue continue
f=True f=True
if f: if f:
add_leads_id(cur, conn, ist_sql,identityType, idStr, id) add_leads_id(ist_sql,identityType, idStr, id)
if tagId is None:
print(f'标签id为空无法更新标签{tagId}')
continue
resU = request_update_tags(idStr,tagId) resU = request_update_tags(idStr,tagId)
if 'error' in resU and resU['error']['code'] == 400010: if 'error' in resU and resU['error']['code'] == 400010:
get_token(auth_url) get_token(auth_url)
@ -142,11 +147,9 @@ if __name__ == "__main__":
if 'error' in resU: if 'error' in resU:
print(f'获取客户线索id异常,接口返回信息:{resU}') print(f'获取客户线索id异常,接口返回信息:{resU}')
continue continue
update_db_flag(cur, conn, upt_sql,s,custId,tagId) update_db_flag( upt_sql,s,custId,tagId)
except Exception as e: except Exception as e:
print(f'处理客户({id}:{tagId})标签信息,异常信息:{e}') print(f'处理客户({id}:{tagId})标签信息,异常信息:{e}')
else: else:
print(f'记录数为0此处无需处理标签信息') print(f'记录数为0此处无需处理标签信息')
cur.close()
conn.close()
print(f'{formatted2_previous_hour(0)}更新标签信息结束') print(f'{formatted2_previous_hour(0)}更新标签信息结束')