add workflow 白名单数据入仓,dev

This commit is contained in:
root 2025-10-21 16:08:56 +08:00
parent 49646fa936
commit 82a38c022f
4 changed files with 300 additions and 0 deletions

View File

@ -0,0 +1,102 @@
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
delete from p10_sa.S98_S_tk_white_list
;
insert into p10_sa.S98_S_tk_white_list
( extract_date
, account_name
, keys
, city
, district
, industry
, sub_industry
, tags
, tsm
, region
, custom_type
, url
, market_capitalization
, order_last_3y
, pos_last_3y
, total
, open_funnel
, contact
, financing_round
, open_lab_visit
, etl_tx_dt )
select
extract_date
, account_name
, keys
, city
, district
, industry
, sub_industry
, tags
, tsm
, region
, custom_type
, url
, market_capitalization
, order_last_3y
, pos_last_3y
, total
, open_funnel
, contact
, financing_round
, open_lab_visit
, etl_tx_dt
from p00_tal.S98_S_tk_white_list
;
delete from p12_sfull.S98_S_tk_white_list
;
;
insert into p12_sfull.S98_S_tk_white_list
( extract_date
, account_name
, keys
, city
, district
, industry
, sub_industry
, tags
, tsm
, region
, custom_type
, url
, market_capitalization
, order_last_3y
, pos_last_3y
, total
, open_funnel
, contact
, financing_round
, open_lab_visit
, etl_tx_dt )
select
extract_date
, account_name
, keys
, city
, district
, industry
, sub_industry
, tags
, tsm
, region
, custom_type
, url
, market_capitalization
, order_last_3y
, pos_last_3y
, total
, open_funnel
, contact
, financing_round
, open_lab_visit
, etl_tx_dt
from p10_sa.S98_S_tk_white_list
;
\q

View File

@ -0,0 +1,32 @@
CREATE FOREIGN TABLE if not exists p00_tal.S98_S_tk_white_list (
extract_date TEXT
, account_name TEXT
, keys TEXT
, city TEXT
, district TEXT
, industry TEXT
, sub_industry TEXT
, tags TEXT
, tsm TEXT
, region TEXT
, custom_type TEXT
, url TEXT
, market_capitalization TEXT
, order_last_3y TEXT
, pos_last_3y TEXT
, total TEXT
, open_funnel TEXT
, contact TEXT
, financing_round TEXT
, open_lab_visit TEXT
, etl_tx_dt TIMESTAMP
)
SERVER pgsql_server_S98_S OPTIONS(schema_name 'data_api', table_name 'tk_white_list' );

View File

@ -0,0 +1,105 @@
create table if not exists p10_sa.S98_S_tk_white_list (
extract_date TEXT
, account_name TEXT
, keys TEXT
, city TEXT
, district TEXT
, industry TEXT
, sub_industry TEXT
, tags TEXT
, tsm TEXT
, region TEXT
, custom_type TEXT
, url TEXT
, market_capitalization TEXT
, order_last_3y TEXT
, pos_last_3y TEXT
, total TEXT
, open_funnel TEXT
, contact TEXT
, financing_round TEXT
, open_lab_visit TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.extract_date IS '日期';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.account_name IS '客户名称';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.keys IS '关键字';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.city IS '城市';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.district IS '区县';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.industry IS '大行业';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.sub_industry IS '小行业';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.tags IS '关键词';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.tsm IS 'TSM';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.region IS 'Region';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.custom_type IS '客户类型';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.url IS '官网链接';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.market_capitalization IS '市值';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.order_last_3y IS '过去3年
Order';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.pos_last_3y IS '过去3年
POS';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.total IS 'Total';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.open_funnel IS 'Open Funnel';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.contact IS 'of联系人';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.financing_round IS '融资轮次';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.open_lab_visit IS 'Open Lab
Visit';
COMMENT ON COLUMN p10_sa.S98_S_tk_white_list.etl_tx_dt IS '';
COMMENT ON TABLE p10_sa.S98_S_tk_white_list IS '';
create table if not exists p12_sfull.S98_S_tk_white_list (
extract_date TEXT
, account_name TEXT
, keys TEXT
, city TEXT
, district TEXT
, industry TEXT
, sub_industry TEXT
, tags TEXT
, tsm TEXT
, region TEXT
, custom_type TEXT
, url TEXT
, market_capitalization TEXT
, order_last_3y TEXT
, pos_last_3y TEXT
, total TEXT
, open_funnel TEXT
, contact TEXT
, financing_round TEXT
, open_lab_visit TEXT
, etl_tx_dt TIMESTAMP
) ;
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.extract_date IS '日期';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.account_name IS '客户名称';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.keys IS '关键字';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.city IS '城市';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.district IS '区县';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.industry IS '大行业';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.sub_industry IS '小行业';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.tags IS '关键词';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.tsm IS 'TSM';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.region IS 'Region';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.custom_type IS '客户类型';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.url IS '官网链接';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.market_capitalization IS '市值';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.order_last_3y IS '过去3年
Order';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.pos_last_3y IS '过去3年
POS';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.total IS 'Total';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.open_funnel IS 'Open Funnel';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.contact IS 'of联系人';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.financing_round IS '融资轮次';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.open_lab_visit IS 'Open Lab
Visit';
COMMENT ON COLUMN p12_sfull.S98_S_tk_white_list.etl_tx_dt IS '';
COMMENT ON TABLE p12_sfull.S98_S_tk_white_list IS '';

View File

@ -0,0 +1,61 @@
#!/usr/bin/python
# -*- encoding=utf-8 -*-
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import json
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
sshHook = SSHHook(ssh_conn_id ='ssh_air')
default_args = {
'owner': 'tek_newsletter@163.com',
'email_on_failure': True,
'email_on_retry':True,
'start_date': datetime(2024, 1, 1),
'depends_on_past': False,
'retries': 6,
'retry_delay': timedelta(minutes=10),
}
dag = DAG('wf_dag_white_list', default_args=default_args,
schedule=None,
catchup=False,
dagrun_timeout=timedelta(minutes=160),
max_active_runs=3)
task_failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="task_failed",
to=["tek_newsletter@163.com"],
cc=[""],
subject="white_list_failed",
html_content='<h3>您好white_list作业失败请及时处理" </h3>')
file_tk_white_list = SSHOperator(
ssh_hook=sshHook,
task_id='file_tk_white_list',
command='python /data/airflow/bin/FILELOD.py {{ params.my_param }} {{ ds_nodash }} ',
params={'my_param':"tk_white_list"},
depends_on_past=False,
retries=3,
dag=dag)
tk_white_list_4460 = SSHOperator(
ssh_hook=sshHook,
task_id='tk_white_list_4460',
command='/data/airflow/etl/SA/run_sa.sh {{ ds_nodash }} {{ params.my_param }} ',
params={'my_param':"S98_S_tk_white_list"},
depends_on_past=False,
retries=3,
dag=dag)
file_tk_white_list >> tk_white_list_4460
tk_white_list_4460 >> task_failed