diff --git a/dev/workflow/TK_Cust/crm_tags/CRM标签更新/crm_tag_update.py b/dev/workflow/TK_Cust/crm_tags/CRM标签更新/crm_tag_update.py index b85311c..344ad96 100644 --- a/dev/workflow/TK_Cust/crm_tags/CRM标签更新/crm_tag_update.py +++ b/dev/workflow/TK_Cust/crm_tags/CRM标签更新/crm_tag_update.py @@ -19,8 +19,16 @@ timestamp = time.time() # 为Unix time,即从"1970-01-01 00:00:00"至 auth_url='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials' auth = {} -sel_sql = 'select * from data_api.api_data ' -upt_sql = 'update data_api.api_data set flag=1 where id= %s' +identityType = 'cp_employee_tools_huiju_corp' +sel_sql = "select * from p61_output.ccc_scrm_cust_label_info where sync_ind <> '2';" +upt_sql = "update p61_output.ccc_scrm_cust_label_info set sync_ind=%s, etl_proc_dt=current_timestamp(0) where cust_id = %s and scrm_label_id = %s" +ist_sql = "INSERT INTO p61_output.scrm_leads_external_id_mapping (leads_id, external_type, external_id, etl_proc_dt, etl_tx_dt) VALUES (%s, %s, %s,current_timestamp(0), current_date);" + +database="daas_mpp" +user="dbuser_mpp" +password="T2zWCUdEdxd3" +host="172.17.0.8" +port="5432" def formatted2_previous_hour(h): if h==0: @@ -43,37 +51,46 @@ def get_token(url): i = i + 1 return auth -def get_data_from_db(sql): - conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA", - host="172.17.0.8", port="5432") - cur=conn.cursor() +def get_data_from_db(cur, conn, sql): + # conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) + # cur=conn.cursor() cur.execute(sql) records = cur.fetchall() conn.commit() - cur.close() - conn.close() + # cur.close() + # conn.close() return records -def update_db_flag(sql, data): - conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA", - host="172.17.0.8", port="5432") - cur=conn.cursor() - cur.execute(sql) +def update_db_flag(cur, conn, sql, status, cust_id, tag_id): + # conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) + # cur=conn.cursor() + cur.execute(sql,[status,cust_id,tag_id]) conn.commit() - cur.close() - conn.close() + # cur.close() + # conn.close() + +def add_leads_id(cur, conn, sql, type, leads_id, extern_id): + try: + # conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) + # cur=conn.cursor() + cur.execute(sql,[leads_id,type,extern_id]) + conn.commit() + # cur.close() + # conn.close() + except Exception as e: + print(f'添加线索id异常,接口返回信息:{e}') def request_custom_id(id): - print('开始请求线索id') + print(f'开始请求线索id:{id}') header={} url='https://api.huiju.cool/v2/customerService/findCustomerByIdentity' - body={'access_token':auth['access_token'],'identityType':'cp_employee_tools_huiju_corp','identityValue':id} + body={'access_token':auth['access_token'],'identityType':identityType,'identityValue':id} dataReqL=requests.get(url,headers=header,params=body) resL=json.loads(dataReqL.text) return resL def request_update_tags(idStr, tagId ): - print('开始请求更新线索标签') + print(f'开始请求更新线索标签{idStr}|{tagId}') header={} access_token = auth['access_token'] url=f'https://api.huiju.cool/v2/customerTags/bulkAdd?access_token={access_token}' @@ -87,27 +104,37 @@ def request_update_tags(idStr, tagId ): if __name__ == "__main__": print(f'{formatted2_previous_hour(0)}开始更新标签信息') - get_token(auth_url) + auth = get_token(auth_url) records = [] - records = get_data_from_db(sel_sql) + conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) + cur=conn.cursor() + records = get_data_from_db(cur, conn, sel_sql) if len(records) > 0: print(f'此处需处理{len(records)}条标签信息') for data in records: - id = data['id'] - tagId = data['tagId'] + custId = data[0] + id = data[1] + idStr = data[2] + tagId = data[5] print(f'开始处理客户({id}:{tagId})标签信息') try: - resL = request_custom_id(id) - if 'error' in resL and resL['error']['code'] == 400010: - get_token(auth_url) + f = False + s = '2' + if not idStr: resL = request_custom_id(id) - if 'error' in resL: - print(f'获取客户线索id异常,接口返回信息:{resL}') - continue - idStr = resL['idStr'] - if idStr == '': - print(f'获取客户线索id为空,接口返回信息:{resL}') - continue + if 'error' in resL and resL['error']['code'] == 400010: + get_token(auth_url) + resL = request_custom_id(id) + if 'error' in resL: + print(f'获取客户线索id异常,接口返回信息:{resL}') + continue + idStr = resL['idStr'] + if idStr == '': + print(f'获取客户线索id为空,接口返回信息:{resL}') + continue + f=True + if f: + add_leads_id(cur, conn, ist_sql,identityType, idStr, id) resU = request_update_tags(idStr,tagId) if 'error' in resU and resU['error']['code'] == 400010: get_token(auth_url) @@ -115,10 +142,11 @@ if __name__ == "__main__": if 'error' in resU: print(f'获取客户线索id异常,接口返回信息:{resU}') continue - update_db_flag(upt_sql,data) + update_db_flag(cur, conn, upt_sql,s,custId,tagId) except Exception as e: print(f'处理客户({id}:{tagId})标签信息,异常信息:{e}') else: print(f'记录数为0,此处无需处理标签信息') - + cur.close() + conn.close() print(f'{formatted2_previous_hour(0)}更新标签信息结束') diff --git a/dev/workflow/TK_Cust/crm_tags/SCRM标签信息/wf_dag_crm_tags.py b/dev/workflow/TK_Cust/crm_tags/SCRM标签信息/wf_dag_crm_tags.py index 5633cda..82110f1 100644 --- a/dev/workflow/TK_Cust/crm_tags/SCRM标签信息/wf_dag_crm_tags.py +++ b/dev/workflow/TK_Cust/crm_tags/SCRM标签信息/wf_dag_crm_tags.py @@ -58,14 +58,14 @@ dag=dag) huiju_tags_feign >> huiju_tags_load -# uds_crm_tag_update = SSHOperator( -# ssh_hook=sshHook, -# task_id='uds_crm_tag_update', -# command='python /data/airflow/etl/Python/crm_tag_update.py', -# params={'my_param':"uds_crm_tag_update"}, -# depends_on_past=False, -# retries=3, -# dag=dag) +uds_crm_tag_update = SSHOperator( +ssh_hook=sshHook, +task_id='uds_crm_tag_update', +command='python /data/airflow/etl/Python/crm_tag_update.py', +params={'my_param':"uds_crm_tag_update"}, +depends_on_past=False, +retries=3, +dag=dag) scrm_tags_5000 = SSHOperator( ssh_hook=sshHook, @@ -77,7 +77,7 @@ retries=3, dag=dag) huiju_tags_load >> scrm_tags_5000 -scrm_tags_5000 >> task_failed +# scrm_tags_5000 >> task_failed -# scrm_tags_5000 >> uds_crm_tag_update -# uds_crm_tag_update >> task_failed +scrm_tags_5000 >> uds_crm_tag_update +uds_crm_tag_update >> task_failed