add workflow 市场News-Agents调度,dev

This commit is contained in:
root 2026-01-11 19:12:00 +08:00
parent af59ab79cc
commit 182e16126c
3 changed files with 265 additions and 1 deletions

View File

@ -0,0 +1,118 @@
/********************************************************************************************/
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
drop table if exists enterprise_card_t1;
create temporary table enterprise_card_t1
as
select distinct on (p1.company_name,p1.enterprise_card_title) p1.company_name,p2.saler_name,p1.whitelist_flag,p1.enterprise_card_title
,p1.summary ,p1.tek_products ,p1.tk_relevance,p1.industry ,p1.prov_name ,p1.city_name ,p1.district,p1.source_url,p1.business_type,p1.create_tm
from (select p1.company company_name
,case when p3.account_name is not null then '1' else '0' end whitelist_flag,p1.content_title enterprise_card_title
,p1.summary ,p1.recommended_products tek_products,p1.relevance_analysis tk_relevance,p2.industry ,p2.prov_name ,p2.city_name ,coalesce(p3.district)district ,p1.url source_url,'招聘信息' business_type,p1.created_at create_tm
from p70_ai_intelligence.agent_enterprise_card_summary p1
left join p30_common.d_account_info p2
on p1.company =p2."name"
left join p30_common.white_list p3
on (p1.company =p3.account_name)
and p3.as_of_date='2025-12-19'
where p2."name" is not null)p1
left join p20_pdm.t00_china_city_pnum p2
on p2.prov_name =p1.prov_name
and p2.city_name like p1.city_name||'%'
and ((p2.district like (substring(p1.district,1,2)||'%') and p1.district<>'')
or p2.district is null
or (p1.district ='' and p2.district like '%(客户未提供具体区域%')
);
drop table if exists enterprise_card_contact_t1;
create temporary table enterprise_card_contact_t1
as
select distinct on (contact_id) cci.contact_id,contact,full_name,mobile,email,wechat,company,'' job_title,p2.active_time,'enterprise_card_summary' source_system from p30_common.cust_contact_info cci
left join p30_common.v_cust_active_all p2
on cci.contact_id =p2.contact_id
where company in (select distinct company_name from enterprise_card_t1);
drop table if exists enterprise_card_funnel_one_t1;
create temporary table enterprise_card_funnel_one_t1
as
select p1.record_id ,p1.apply_date ,contact_name ,contact_phone ,check_date::date ,'Partner' source_channel,case when region_name in ('华南','华东') then 'SE' when region_name in ('华北') then 'NW' else region_name end region_name
,customer_name ,p1.product_name ,p1.product_type ,p1.progress ,rp_total_price*p2.target_currency_value /p2.currency_value as total_amt,'enterprise_card_summary' source_system from p20_pdm.t01_partner_report p1
left join p31_dim.d_currency p2
on p2.currency_cd ='CNY'
and p2.target_currency_cd ='USD'
where check_status_str ='已通过' and customer_name in (select distinct company_name from enterprise_card_t1) ;
drop table if exists enterprise_card_funnel_crm_t1;
create temporary table enterprise_card_funnel_crm_t1
as
select distinct on (p1.tek_opportunity_id) p1.tek_opportunity_id ,p1.create_time ::date create_date,p5.last_name||p5.first_name contact_name,p5.mobile_phone contact_phone,p1.as_of_date::date,'CRM' source_channel,case when upper(p1.area_id) like '%SOUTH%' then 'SE' when upper(p1.area_id) like '%NORTH%' then 'NW' else p1.area_id end area_id,coalesce(p2.name,p1.parent_account_name) account_name,p1.product_name ,split_part(product_class, ':', 1) product_type,p1.state,p1.total_amount/p4.currency_value*p4.target_currency_value total_amount,'enterprise_card_summary' source_system
from p30_common.crm_opportunity_weekly p1
left join p20_pdm.t01_crm_account p2
on p1.account_number =p2.accountnumber
left join p31_dim.d_currency p4
on p1.currency_name=p4.currency_name
and p4.target_currency_cd ='USD'
left join p20_pdm.t01_crm_contact p5
on p1.crm_contact_number =p5.crm_contact_account
where (p1.product_class like '2025%' or p1.product_class like '2035%' or p1.product_class like '2610%' or p1.product_class like '2026%' or p1.product_class like '2036%') and coalesce(p2.name,p1.parent_account_name) in (select distinct company_name from enterprise_card_t1)
order by p1.tek_opportunity_id,status_last_update desc,p1.as_of_date asc;
delete from p60_mart.news_letter_info
where source_system='enterprise_card_summary';
INSERT INTO p60_mart.news_letter_info
(account_name, saler_name, whitelist_flag, newsletter_title, summary, key_products, tek_relevance, industry, prov_name, city_name, district, contact_qty, total_funnel, report_amt,source_url,business_type,create_tm,source_system)
select distinct on (p1.company_name,p1.source_url) p1.company_name account_name, coalesce(saler_name,''), whitelist_flag, enterprise_card_title, summary
, case when p5.data_json::text like '{%' then replace(replace(replace(p5.data_json->>'主营产品'::text,'"',''),'[',''),']','')
when p5.data_json::text like '[%' then replace(replace(replace(p5.data_json::text,'"',''),'[',''),']','') else '' end
,coalesce(p5.recomend_product_series::text,'') tek_relevance, industry, coalesce(prov_name,''), coalesce(city_name), district,coalesce(p2.contact_qty,0) contact_qty, coalesce(total_amt,0)+coalesce(total_amount,0) total_funnel,coalesce(total_amt,0) report_amt,p1.source_url,p1.business_type,p1.create_tm,'enterprise_card_summary' source_system from enterprise_card_t1 p1
left join (select company,count(*) contact_qty from enterprise_card_contact_t1 group by 1 ) p2
on p1.company_name=p2.company
left join (select customer_name,sum(total_amt) total_amt from enterprise_card_funnel_one_t1 group by 1) p3
on p1.company_name=p3.customer_name
left join (select account_name,sum(total_amount) total_amount from enterprise_card_funnel_crm_t1 group by 1) p4
on p1.company_name=p4.account_name
left join p70_ai_intelligence.agent_recommendation_cache p5
on p1.company_name=p5.company_name
order by p1.company_name,p1.source_url,p1.create_tm desc,p5.created_at desc;
drop table if exists enterprise_card_funnel_one_t2;
create temporary table enterprise_card_funnel_one_t2
as
select p1.record_id ,p1.dealer_name ,p1.apply_admin_name ,p1.apply_date ,contact_name ,contact_phone ,check_date::date ,'Partner' source_channel,case when region_name in ('华南','华东') then 'SE' when region_name in ('华北') then 'NW' else region_name end region_name
,customer_name ,p1.product_name ,p1.product_type ,p1.progress ,rp_total_price*p2.target_currency_value /p2.currency_value as total_amt,'enterprise_card_summary' source_system from p20_pdm.t01_partner_report p1
left join p31_dim.d_currency p2
on p2.currency_cd ='CNY'
and p2.target_currency_cd ='USD'
where check_status_str ='已通过' and customer_name in (select distinct company_name from enterprise_card_t1) ;
delete from p60_mart.news_letter_contact_info
where source_system='enterprise_card_summary';
insert into p60_mart.news_letter_contact_info
--drop table p60_mart.news_letter_contact_info;
--create table p60_mart.news_letter_contact_info
--as
select * from enterprise_card_contact_t1
;
delete from p60_mart.news_letter_funnel_info
where source_system='enterprise_card_summary';
insert into p60_mart.news_letter_funnel_info
select * from enterprise_card_funnel_crm_t1
union all
select * from enterprise_card_funnel_one_t1
;
delete from p60_mart.news_letter_report_info
where source_system='enterprise_card_summary';
insert into p60_mart.news_letter_report_info
select * from enterprise_card_funnel_one_t2
\q

View File

@ -0,0 +1,118 @@
/********************************************************************************************/
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
drop table if exists recruitment_t1;
create temporary table recruitment_t1
as
select distinct on (p1.company_name,p1.recruitment_title) p1.company_name,p2.saler_name,p1.whitelist_flag,p1.recruitment_title
,p1.summary ,p1.tek_products ,p1.tk_relevance,p1.industry ,p1.prov_name ,p1.city_name ,p1.district,p1.source_url,p1.business_type,p1.create_tm
from (select p1.company company_name
,case when p3.account_name is not null then '1' else '0' end whitelist_flag,p1.content_title recruitment_title
,p1.summary ,p1.recommended_products tek_products,p1.relevance_analysis tk_relevance,p2.industry ,p2.prov_name ,p2.city_name ,coalesce(p3.district)district ,p1.url source_url,'招聘信息' business_type,p1.created_at create_tm
from p70_ai_intelligence.agent_recruitment_summary p1
left join p30_common.d_account_info p2
on p1.company =p2."name"
left join p30_common.white_list p3
on (p1.company =p3.account_name)
and p3.as_of_date='2025-12-19'
where p2."name" is not null)p1
left join p20_pdm.t00_china_city_pnum p2
on p2.prov_name =p1.prov_name
and p2.city_name like p1.city_name||'%'
and ((p2.district like (substring(p1.district,1,2)||'%') and p1.district<>'')
or p2.district is null
or (p1.district ='' and p2.district like '%(客户未提供具体区域%')
);
drop table if exists recruitment_contact_t1;
create temporary table recruitment_contact_t1
as
select distinct on (contact_id) cci.contact_id,contact,full_name,mobile,email,wechat,company,'' job_title,p2.active_time,'recruitment_summary' source_system from p30_common.cust_contact_info cci
left join p30_common.v_cust_active_all p2
on cci.contact_id =p2.contact_id
where company in (select distinct company_name from recruitment_t1);
drop table if exists recruitment_funnel_one_t1;
create temporary table recruitment_funnel_one_t1
as
select p1.record_id ,p1.apply_date ,contact_name ,contact_phone ,check_date::date ,'Partner' source_channel,case when region_name in ('华南','华东') then 'SE' when region_name in ('华北') then 'NW' else region_name end region_name
,customer_name ,p1.product_name ,p1.product_type ,p1.progress ,rp_total_price*p2.target_currency_value /p2.currency_value as total_amt,'recruitment_summary' source_system from p20_pdm.t01_partner_report p1
left join p31_dim.d_currency p2
on p2.currency_cd ='CNY'
and p2.target_currency_cd ='USD'
where check_status_str ='已通过' and customer_name in (select distinct company_name from recruitment_t1) ;
drop table if exists recruitment_funnel_crm_t1;
create temporary table recruitment_funnel_crm_t1
as
select distinct on (p1.tek_opportunity_id) p1.tek_opportunity_id ,p1.create_time ::date create_date,p5.last_name||p5.first_name contact_name,p5.mobile_phone contact_phone,p1.as_of_date::date,'CRM' source_channel,case when upper(p1.area_id) like '%SOUTH%' then 'SE' when upper(p1.area_id) like '%NORTH%' then 'NW' else p1.area_id end area_id,coalesce(p2.name,p1.parent_account_name) account_name,p1.product_name ,split_part(product_class, ':', 1) product_type,p1.state,p1.total_amount/p4.currency_value*p4.target_currency_value total_amount,'recruitment_summary' source_system
from p30_common.crm_opportunity_weekly p1
left join p20_pdm.t01_crm_account p2
on p1.account_number =p2.accountnumber
left join p31_dim.d_currency p4
on p1.currency_name=p4.currency_name
and p4.target_currency_cd ='USD'
left join p20_pdm.t01_crm_contact p5
on p1.crm_contact_number =p5.crm_contact_account
where (p1.product_class like '2025%' or p1.product_class like '2035%' or p1.product_class like '2610%' or p1.product_class like '2026%' or p1.product_class like '2036%') and coalesce(p2.name,p1.parent_account_name) in (select distinct company_name from recruitment_t1)
order by p1.tek_opportunity_id,status_last_update desc,p1.as_of_date asc;
delete from p60_mart.news_letter_info
where source_system='recruitment_summary';
INSERT INTO p60_mart.news_letter_info
(account_name, saler_name, whitelist_flag, newsletter_title, summary, key_products, tek_relevance, industry, prov_name, city_name, district, contact_qty, total_funnel, report_amt,source_url,business_type,create_tm,source_system)
select distinct on (p1.company_name,p1.source_url) p1.company_name account_name, coalesce(saler_name,''), whitelist_flag, recruitment_title, summary
, case when p5.data_json::text like '{%' then replace(replace(replace(p5.data_json->>'主营产品'::text,'"',''),'[',''),']','')
when p5.data_json::text like '[%' then replace(replace(replace(p5.data_json::text,'"',''),'[',''),']','') else '' end
,coalesce(p5.recomend_product_series::text,'') tek_relevance, industry, coalesce(prov_name,''), coalesce(city_name), district,coalesce(p2.contact_qty,0) contact_qty, coalesce(total_amt,0)+coalesce(total_amount,0) total_funnel,coalesce(total_amt,0) report_amt,p1.source_url,p1.business_type,p1.create_tm,'recruitment_summary' source_system from recruitment_t1 p1
left join (select company,count(*) contact_qty from recruitment_contact_t1 group by 1 ) p2
on p1.company_name=p2.company
left join (select customer_name,sum(total_amt) total_amt from recruitment_funnel_one_t1 group by 1) p3
on p1.company_name=p3.customer_name
left join (select account_name,sum(total_amount) total_amount from recruitment_funnel_crm_t1 group by 1) p4
on p1.company_name=p4.account_name
left join p70_ai_intelligence.agent_recommendation_cache p5
on p1.company_name=p5.company_name
order by p1.company_name,p1.source_url,p1.create_tm desc,p5.created_at desc;
drop table if exists recruitment_funnel_one_t2;
create temporary table recruitment_funnel_one_t2
as
select p1.record_id ,p1.dealer_name ,p1.apply_admin_name ,p1.apply_date ,contact_name ,contact_phone ,check_date::date ,'Partner' source_channel,case when region_name in ('华南','华东') then 'SE' when region_name in ('华北') then 'NW' else region_name end region_name
,customer_name ,p1.product_name ,p1.product_type ,p1.progress ,rp_total_price*p2.target_currency_value /p2.currency_value as total_amt,'recruitment_summary' source_system from p20_pdm.t01_partner_report p1
left join p31_dim.d_currency p2
on p2.currency_cd ='CNY'
and p2.target_currency_cd ='USD'
where check_status_str ='已通过' and customer_name in (select distinct company_name from recruitment_t1) ;
delete from p60_mart.news_letter_contact_info
where source_system='recruitment_summary';
insert into p60_mart.news_letter_contact_info
--drop table p60_mart.news_letter_contact_info;
--create table p60_mart.news_letter_contact_info
--as
select * from recruitment_contact_t1
;
delete from p60_mart.news_letter_funnel_info
where source_system='recruitment_summary';
insert into p60_mart.news_letter_funnel_info
select * from recruitment_funnel_crm_t1
union all
select * from recruitment_funnel_one_t1
;
delete from p60_mart.news_letter_report_info
where source_system='recruitment_summary';
insert into p60_mart.news_letter_report_info
select * from recruitment_funnel_one_t2
\q

View File

@ -110,10 +110,38 @@ dag=dag)
dysql_enterprise_card_newsletter = SSHOperator(
ssh_hook=sshHook,
task_id='dysql_enterprise_card_newsletter',
command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} ',
params={'my_param':"dysql_enterprise_card_newsletter"},
depends_on_past=False,
retries=3,
dag=dag)
dysql_recruitment_news_letter = SSHOperator(
ssh_hook=sshHook,
task_id='dysql_recruitment_news_letter',
command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} ',
params={'my_param':"dysql_recruitment_news_letter"},
depends_on_past=False,
retries=3,
dag=dag)
agents_market_summar >> tk_product_prefer
agents_market_summar >> batch_gen_crawer
agents_recuriments >> batch_gen_recruiment
a_market_Intelligence_newsletter >> tk_product_prefer
tk_product_prefer >> dysql_news_letter_info
agents_market_summar >> a_market_Intelligence_newsletter
a_market_Intelligence_newsletter >> task_failed
tk_product_prefer >> dysql_recruitment_news_letter
dysql_enterprise_card_newsletter >> dysql_recruitment_news_letter
agents_recuriments >> tk_product_prefer
tk_product_prefer >> dysql_enterprise_card_newsletter
dysql_enterprise_card_newsletter >> task_failed