diff --git a/dev/workflow/TK_Cust/smart_ccc_custom/CRM标签更新/crm_tag_update.py b/dev/workflow/TK_Cust/smart_ccc_custom/CRM标签更新/crm_tag_update.py index 13a358d..cb4de41 100644 --- a/dev/workflow/TK_Cust/smart_ccc_custom/CRM标签更新/crm_tag_update.py +++ b/dev/workflow/TK_Cust/smart_ccc_custom/CRM标签更新/crm_tag_update.py @@ -22,7 +22,9 @@ auth = {} identityType = 'cp_employee_tools_huiju_corp' sel_sql = "select * from p61_output.ccc_scrm_cust_label_info where sync_ind <> '2';" upt_sql = "update p61_output.ccc_scrm_cust_label_info set sync_ind=%s, etl_proc_dt=current_timestamp(0) where cust_id = %s and scrm_label_id = %s" -ist_sql = "INSERT INTO p61_output.scrm_leads_external_id_mapping (leads_id, external_type, external_id, etl_proc_dt, etl_tx_dt) VALUES (%s, %s, %s,current_timestamp(0), current_date);" +ist_sql = "INSERT INTO p61_output.scrm_leads_external_id_mapping (leads_id, external_type, external_id, etl_proc_dt, etl_tx_dt) VALUES (%s, %s, %s,current_timestamp(0), current_date) on CONFLICT (leads_id, external_type) DO NOTHING;" +# ist_sql = "delete from p61_output.scrm_leads_external_id_mapping where leads_id=%s and external_type = %s ; INSERT INTO p61_output.scrm_leads_external_id_mapping (leads_id, external_type, external_id, etl_proc_dt, etl_tx_dt) VALUES (%s, %s, %s,current_timestamp(0), current_date);" + database="daas_mpp" user="dbuser_mpp" @@ -51,32 +53,32 @@ def get_token(url): i = i + 1 return auth -def get_data_from_db(cur, conn, sql): - # conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) - # cur=conn.cursor() +def get_data_from_db(sql): + conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) + cur=conn.cursor() cur.execute(sql) records = cur.fetchall() conn.commit() - # cur.close() - # conn.close() + cur.close() + conn.close() return records -def update_db_flag(cur, conn, sql, status, cust_id, tag_id): - # conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) - # cur=conn.cursor() +def update_db_flag(sql, status, cust_id, tag_id): + conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) + cur=conn.cursor() cur.execute(sql,[status,cust_id,tag_id]) conn.commit() - # cur.close() - # conn.close() + cur.close() + conn.close() -def add_leads_id(cur, conn, sql, type, leads_id, extern_id): +def add_leads_id(sql, type, leads_id, extern_id): try: - # conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) - # cur=conn.cursor() + conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) + cur=conn.cursor() cur.execute(sql,[leads_id,type,extern_id]) conn.commit() - # cur.close() - # conn.close() + cur.close() + conn.close() except Exception as e: print(f'添加线索id异常,接口返回信息:{e}') @@ -106,9 +108,9 @@ if __name__ == "__main__": print(f'{formatted2_previous_hour(0)}开始更新标签信息') auth = get_token(auth_url) records = [] - conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) - cur=conn.cursor() - records = get_data_from_db(cur, conn, sel_sql) + # conn = psycopg2.connect(database=database, user=user, password=password,host=host, port=port) + # cur=conn.cursor() + records = get_data_from_db( sel_sql) if len(records) > 0: print(f'此处需处理{len(records)}条标签信息') for data in records: @@ -134,7 +136,10 @@ if __name__ == "__main__": continue f=True if f: - add_leads_id(cur, conn, ist_sql,identityType, idStr, id) + add_leads_id(ist_sql,identityType, idStr, id) + if tagId is None: + print(f'标签id为空,无法更新标签{tagId}') + continue resU = request_update_tags(idStr,tagId) if 'error' in resU and resU['error']['code'] == 400010: get_token(auth_url) @@ -142,11 +147,9 @@ if __name__ == "__main__": if 'error' in resU: print(f'获取客户线索id异常,接口返回信息:{resU}') continue - update_db_flag(cur, conn, upt_sql,s,custId,tagId) + update_db_flag( upt_sql,s,custId,tagId) except Exception as e: print(f'处理客户({id}:{tagId})标签信息,异常信息:{e}') else: print(f'记录数为0,此处无需处理标签信息') - cur.close() - conn.close() print(f'{formatted2_previous_hour(0)}更新标签信息结束')