clear workflow
This commit is contained in:
		
							parent
							
								
									98c339c998
								
							
						
					
					
						commit
						4481248f5a
					
				|  | @ -1,59 +0,0 @@ | ||||||
| #!/usr/bin/python |  | ||||||
| # -*- encoding=utf-8 -*- |  | ||||||
| from airflow import DAG |  | ||||||
| from datetime import datetime, timedelta |  | ||||||
| from airflow.contrib.hooks.ssh_hook import SSHHook |  | ||||||
| from airflow.contrib.operators.ssh_operator import SSHOperator |  | ||||||
| from airflow.sensors.external_task_sensor import ExternalTaskSensor |  | ||||||
| import json |  | ||||||
| 
 |  | ||||||
| from airflow.operators.email_operator import EmailOperator |  | ||||||
| from airflow.utils.trigger_rule import TriggerRule |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| sshHook = SSHHook(ssh_conn_id ='ssh_air') |  | ||||||
| default_args = { |  | ||||||
| 'owner': 'info@idgvalue.com', |  | ||||||
| 'email': [''], |  | ||||||
| 'email_on_failure': True, |  | ||||||
| 'email_on_retry':True, |  | ||||||
| 'start_date': datetime(2022, 9, 12), |  | ||||||
| 'depends_on_past': False, |  | ||||||
| 'retries': 6, |  | ||||||
| 'retry_delay': timedelta(minutes=10), |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| dag = DAG('wf_dag_tk_api', default_args=default_args, |  | ||||||
| schedule_interval="0 0 * * *", |  | ||||||
| catchup=False, |  | ||||||
| dagrun_timeout=timedelta(minutes=160), |  | ||||||
| max_active_runs=3) |  | ||||||
| 
 |  | ||||||
| task_failed = EmailOperator ( |  | ||||||
|     dag=dag, |  | ||||||
|     trigger_rule=TriggerRule.ONE_FAILED, |  | ||||||
|     task_id="task_failed", |  | ||||||
|     to=["info@idgvalue.com"], |  | ||||||
|     cc=[""], |  | ||||||
|     subject="tk_api_failed", |  | ||||||
|     html_content='<h3>您好,tk_api作业失败,请及时处理" </h3>') |  | ||||||
| 
 |  | ||||||
| scrm_contact_feign = SSHOperator( |  | ||||||
| ssh_hook=sshHook, |  | ||||||
| task_id='scrm_contact_feign', |  | ||||||
| command='python3 /data/airflow/etl/API/scrm_contact_feign.py', |  | ||||||
| depends_on_past=False,  |  | ||||||
| retries=3,  |  | ||||||
| dag=dag) |  | ||||||
| 
 |  | ||||||
| scrm_contact_load = SSHOperator( |  | ||||||
| ssh_hook=sshHook, |  | ||||||
| task_id='scrm_contact_load', |  | ||||||
| command='/data/airflow/etl/API/run_psql.sh {{ ds_nodash }} {{params.my_param }}', |  | ||||||
| params={'my_param':"scrm_contact_load"}, |  | ||||||
| depends_on_past=False,  |  | ||||||
| retries=3,  |  | ||||||
| dag=dag) |  | ||||||
| 
 |  | ||||||
| scrm_contact_feign >> scrm_contact_load |  | ||||||
| 
 |  | ||||||
|  | @ -1,41 +0,0 @@ | ||||||
| # coding: utf-8 |  | ||||||
| import requests |  | ||||||
| import json |  | ||||||
| import psycopg2 |  | ||||||
| import uuid |  | ||||||
| import datetime   |  | ||||||
| 
 |  | ||||||
| # 获取当前日期   |  | ||||||
| current_date = datetime.date.today()  |  | ||||||
| previous_date = current_date - datetime.timedelta(days=1) |  | ||||||
| formatted_current_date = current_date.strftime("%Y-%m-%dT%H:%M:%SZ")   |  | ||||||
| formatted_previous_date = previous_date.strftime("%Y-%m-%dT%H:%M:%SZ")  |  | ||||||
| 
 |  | ||||||
| print('开始加载数据:scrm_contact:获取SCRM contact') |  | ||||||
| authUrl='https://api.huiju.cool/v2/oauth2/token?app_id=cl037b184ebccd97c&secret=deaa83fdd8d385eec0d1d04d34282c4ec8c33b60&grant_type=client_credentials' |  | ||||||
|   |  | ||||||
| print('开始请求令牌。') |  | ||||||
| authRequest=requests.get(authUrl) |  | ||||||
| auth=json.loads(authRequest.text) |  | ||||||
| print('开始请求数据总数。') |  | ||||||
| url='https://api.huiju.cool/v2/customers' |  | ||||||
| header={} |  | ||||||
| body={'access_token':auth['access_token'],'sort':'dateJoin','dateJoin[ge]':formatted_previous_date,'dateJoin[le]':formatted_current_date,'limit':'',} |  | ||||||
| dataReqL=requests.get(url,headers=header,params=body) |  | ||||||
| resL=json.loads(dataReqL.text) |  | ||||||
| # print(resL) |  | ||||||
| dataList=resL['data'] |  | ||||||
| total=len(dataList) |  | ||||||
| conn = psycopg2.connect(database="dataops_db", user="dbuser_dops", password="MIgTi3jA", |  | ||||||
|                                 host="172.17.0.8", port="5432") |  | ||||||
| print('数据库连接成功') |  | ||||||
| dataId=str(uuid.uuid4()) |  | ||||||
| print('临时id:'+dataId) |  | ||||||
| json_object = json.dumps(dataList) |  | ||||||
| cur=conn.cursor() |  | ||||||
| sql="update data_api.api_data set is_loaded = '1' where api_id = 'cab1a761-b8e5-4db4-a769-88db3ec1';INSERT INTO data_api.api_data (id,api_id,data,total_num,is_loaded,status,request_tm,execute_tm,remark) values (%s,%s,%s,%s,'0','0',current_timestamp(0),current_timestamp(0),'')" |  | ||||||
| cur.execute(sql,[dataId,'cab1a761-b8e5-4db4-a769-88db3ec1',json_object,total]) |  | ||||||
| conn.commit() |  | ||||||
| cur.close() |  | ||||||
| conn.close() |  | ||||||
| print('加载数据结束:scrm_contact:获取SCRM contact') |  | ||||||
|  | @ -1,105 +0,0 @@ | ||||||
| /*******Main Section**************************************************************************/ |  | ||||||
| \set ON_ERROR_STOP on |  | ||||||
| \set AUTOCOMMIT on |  | ||||||
| \timing on |  | ||||||
|   |  | ||||||
| DELETE FROM data_api.scrm_contact; |  | ||||||
| 
 |  | ||||||
| insert into  data_api.scrm_contact (  |  | ||||||
| 	 img  |  | ||||||
| 	, not_acceptsms  |  | ||||||
| 	, gender  |  | ||||||
| 	, create_method  |  | ||||||
| 	, mobile_verified  |  | ||||||
| 	, language  |  | ||||||
| 	, is_member  |  | ||||||
| 	, source  |  | ||||||
| 	, is_referrer  |  | ||||||
| 	, create_from  |  | ||||||
| 	, last_updated  |  | ||||||
| 	, date_created  |  | ||||||
| 	, acceptsms  |  | ||||||
| 	, id  |  | ||||||
| 	, subscribe_scene  |  | ||||||
| 	, id_str  |  | ||||||
| 	, is_employee  |  | ||||||
| 	, version  |  | ||||||
| 	, not_accept_email  |  | ||||||
| 	, date_join  |  | ||||||
| 	, content_name  |  | ||||||
| 	, update_method  |  | ||||||
| 	, email_verified  |  | ||||||
| 	, is_anonymous  |  | ||||||
| 	, accept_email  |  | ||||||
| 	, create_from_name  |  | ||||||
| 	, status  |  | ||||||
| 	,etl_tx_dt |  | ||||||
| ) |  | ||||||
| select  |  | ||||||
| 	 case when trim(both from img)='' then null else  img::text  end  img  |  | ||||||
| 	, case when trim(both from not_acceptsms)='' then null else  not_acceptsms::text  end  not_acceptsms  |  | ||||||
| 	, case when trim(both from gender)='' then null else  gender::text  end  gender  |  | ||||||
| 	, case when trim(both from create_method)='' then null else  create_method::text  end  create_method  |  | ||||||
| 	, case when trim(both from mobile_verified)='' then null else  mobile_verified::text  end  mobile_verified  |  | ||||||
| 	, case when trim(both from language)='' then null else  language::text  end  language  |  | ||||||
| 	, case when trim(both from is_member)='' then null else  is_member::text  end  is_member  |  | ||||||
| 	, case when trim(both from source)='' then null else  source::text  end  source  |  | ||||||
| 	, case when trim(both from is_referrer)='' then null else  is_referrer::text  end  is_referrer  |  | ||||||
| 	, case when trim(both from create_from)='' then null else  create_from::text  end  create_from  |  | ||||||
| 	, case when trim(both from last_updated)='' then null else  last_updated::text  end  last_updated  |  | ||||||
| 	, case when trim(both from date_created)='' then null else  date_created::text  end  date_created  |  | ||||||
| 	, case when trim(both from acceptsms)='' then null else  acceptsms::text  end  acceptsms  |  | ||||||
| 	, case when trim(both from id)='' then null else  id::text  end  id  |  | ||||||
| 	, case when trim(both from subscribe_scene)='' then null else  subscribe_scene::text  end  subscribe_scene  |  | ||||||
| 	, case when trim(both from id_str)='' then null else  id_str::text  end  id_str  |  | ||||||
| 	, case when trim(both from is_employee)='' then null else  is_employee::text  end  is_employee  |  | ||||||
| 	, case when trim(both from version)='' then null else  version::text  end  version  |  | ||||||
| 	, case when trim(both from not_accept_email)='' then null else  not_accept_email::text  end  not_accept_email  |  | ||||||
| 	, case when trim(both from date_join)='' then null else  date_join::text  end  date_join  |  | ||||||
| 	, case when trim(both from content_name)='' then null else  content_name::text  end  content_name  |  | ||||||
| 	, case when trim(both from update_method)='' then null else  update_method::text  end  update_method  |  | ||||||
| 	, case when trim(both from email_verified)='' then null else  email_verified::text  end  email_verified  |  | ||||||
| 	, case when trim(both from is_anonymous)='' then null else  is_anonymous::text  end  is_anonymous  |  | ||||||
| 	, case when trim(both from accept_email)='' then null else  accept_email::text  end  accept_email  |  | ||||||
| 	, case when trim(both from create_from_name)='' then null else  create_from_name::text  end  create_from_name  |  | ||||||
| 	, case when trim(both from status)='' then null else  status::text  end  status  |  | ||||||
| ,etl_tx_dt |  | ||||||
| from ( |  | ||||||
| select  |  | ||||||
| 	 (json_array_elements(data::json)::json->>'img')  img  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'notAcceptsms')  not_acceptsms  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'gender')  gender  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'createMethod')  create_method  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'mobileVerified')  mobile_verified  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'language')  language  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'isMember')  is_member  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'source')  source  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'isReferrer')  is_referrer  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'createFrom')  create_from  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'lastUpdated')  last_updated  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'dateCreated')  date_created  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'acceptsms')  acceptsms  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'id')  id  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'subscribeScene')  subscribe_scene  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'idStr')  id_str  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'isEmployee')  is_employee  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'version')  version  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'notAcceptEmail')  not_accept_email  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'dateJoin')  date_join  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'contentName')  content_name  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'updateMethod')  update_method  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'emailVerified')  email_verified  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'isAnonymous')  is_anonymous  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'acceptEmail')  accept_email  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'createFromName')  create_from_name  |  | ||||||
| 	, (json_array_elements(data::json)::json->>'status')  status  |  | ||||||
| 	,CURRENT_TIMESTAMP(0)  etl_tx_dt |  | ||||||
| from (select * from data_api.api_data  |  | ||||||
| WHERE api_id='cab1a761-b8e5-4db4-a769-88db3ec1' and is_loaded = '0' order by request_tm desc limit 1) p )p; |  | ||||||
| 
 |  | ||||||
| update  data_api.api_data  |  | ||||||
| set is_loaded = '1' , |  | ||||||
| 	status = '1', |  | ||||||
| 	request_tm = current_timestamp(0) |  | ||||||
| where api_id='cab1a761-b8e5-4db4-a769-88db3ec1'; |  | ||||||
| \q |  | ||||||
		Loading…
	
		Reference in New Issue