add workflow 市场-Agents调度,dev

This commit is contained in:
root 2025-12-09 18:29:05 +08:00
parent b35a4b83e1
commit 4b939dbf07
2 changed files with 130 additions and 1 deletions

View File

@ -0,0 +1,116 @@
/********************************************************************************************/
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
drop table if exists newsletter_t1;
create temporary table newsletter_t1
as
select distinct on (p1.company_name,p1.newsletter_title) p1.company_name,p2.saler_name,p1.whitelist_flag,p1.newsletter_title
,p1.summary ,p1.tek_products ,p1.tk_relevance,p1.industry ,p1.prov_name ,p1.city_name ,p1.district,p1.source_url,p1.business_type,p1.create_tm
from (select case when p1.company_name_ai<>'' then p1.company_name_ai else p1.company_name end company_name
,case when p3.account_name is not null then '1' else '0' end whitelist_flag,p1.newsletter_title
,p1.summary ,p1.tek_products ,p1.tk_relevance,p2.industry ,p2.prov_name ,p2.city_name ,coalesce(p3.district,p1.district )district ,p1.source_url,p1.business_type,p1.create_tm
from p30_common.a_market_intelligence_newsletter p1
left join p30_common.d_account_info p2
on p1.company_name =p2."name"
or p1.company_name_ai =p2."name"
left join p30_common.white_list p3
on (p1.company_name =p3.account_name
or p1.company_name_ai =p3.account_name)
and p3.as_of_date='2025-11-14'
where p2."name" is not null)p1
left join p20_pdm.t00_china_city_pnum p2
on p2.prov_name =p1.prov_name
and p2.city_name like p1.city_name||'%'
and ((p2.district like (substring(p1.district,1,2)||'%') and p1.district<>'')
or p2.district is null
or (p1.district ='' and p2.district like '%(客户未提供具体区域%')
);
drop table if exists newsletter_contact_t1;
create temporary table newsletter_contact_t1
as
select distinct on (contact_id) cci.contact_id,contact,full_name,mobile,email,wechat,company,'' job_title,p2.active_time from p30_common.cust_contact_info cci
left join p30_common.v_cust_active_all p2
on cci.contact_id =p2.contact_id
where company in (select distinct company_name from newsletter_t1);
drop table if exists newsletter_funnel_one_t1;
create temporary table newsletter_funnel_one_t1
as
select p1.record_id ,p1.apply_date ,contact_name ,contact_phone ,check_date::date ,'Partner' source_channel,case when region_name in ('华南','华东') then 'SE' when region_name in ('华北') then 'NW' else region_name end region_name
,customer_name ,p1.product_name ,p1.product_type ,p1.progress ,rp_total_price*p2.target_currency_value /p2.currency_value as total_amt from p20_pdm.t01_partner_report p1
left join p31_dim.d_currency p2
on p2.currency_cd ='CNY'
and p2.target_currency_cd ='USD'
where check_status_str ='已通过' and customer_name in (select distinct company_name from newsletter_t1) ;
drop table if exists newsletter_funnel_crm_t1;
create temporary table newsletter_funnel_crm_t1
as
select distinct on (p1.tek_opportunity_id) p1.tek_opportunity_id ,p1.create_time ::date create_date,p5.last_name||p5.first_name contact_name,p5.mobile_phone contact_phone,p1.as_of_date::date,'CRM' source_channel,case when upper(p1.area_id) like '%SOUTH%' then 'SE' when upper(p1.area_id) like '%NORTH%' then 'NW' else p1.area_id end area_id,coalesce(p2.name,p1.parent_account_name) account_name,p1.product_name ,split_part(product_class, ':', 1) product_type,p1.state,p1.total_amount/p4.currency_value*p4.target_currency_value total_amount
from p30_common.crm_opportunity_weekly p1
left join p20_pdm.t01_crm_account p2
on p1.account_number =p2.accountnumber
left join p31_dim.d_currency p4
on p1.currency_name=p4.currency_name
and p4.target_currency_cd ='USD'
left join p20_pdm.t01_crm_contact p5
on p1.crm_contact_number =p5.crm_contact_account
where (p1.product_class like '2025%' or p1.product_class like '2035%' or p1.product_class like '2610%' or p1.product_class like '2026%' or p1.product_class like '2036%') and coalesce(p2.name,p1.parent_account_name) in (select distinct company_name from newsletter_t1)
order by p1.tek_opportunity_id,status_last_update desc,p1.as_of_date asc;
delete from p60_mart.news_letter_info;
INSERT INTO p60_mart.news_letter_info
(account_name, saler_name, whitelist_flag, newsletter_title, summary, key_products, tek_relevance, industry, prov_name, city_name, district, contact_qty, total_funnel, report_amt,source_url,business_type,create_tm)
select distinct on (p1.company_name,p1.source_url) p1.company_name account_name, coalesce(saler_name,''), whitelist_flag, newsletter_title, summary
, case when p5.data_json::text like '{%' then replace(replace(replace(p5.data_json->>'主营产品'::text,'"',''),'[',''),']','')
when p5.data_json::text like '[%' then replace(replace(replace(p5.data_json::text,'"',''),'[',''),']','') else '' end
,coalesce(p5.recomend_product_series::text,'') tek_relevance, industry, coalesce(prov_name,''), coalesce(city_name), district,coalesce(p2.contact_qty,0) contact_qty, coalesce(total_amt,0)+coalesce(total_amount,0) total_funnel,coalesce(total_amt,0) report_amt,p1.source_url,p1.business_type,p1.create_tm from newsletter_t1 p1
left join (select company,count(*) contact_qty from newsletter_contact_t1 group by 1 ) p2
on p1.company_name=p2.company
left join (select customer_name,sum(total_amt) total_amt from newsletter_funnel_one_t1 group by 1) p3
on p1.company_name=p3.customer_name
left join (select account_name,sum(total_amount) total_amount from newsletter_funnel_crm_t1 group by 1) p4
on p1.company_name=p4.account_name
left join p70_ai_intelligence.agent_recommendation_cache p5
on p1.company_name=p5.company_name
order by p1.company_name,p1.source_url,p1.create_tm desc,p5.created_at desc;
drop table if exists newsletter_funnel_one_t2;
create temporary table newsletter_funnel_one_t2
as
select p1.record_id ,p1.dealer_name ,p1.apply_admin_name ,p1.apply_date ,contact_name ,contact_phone ,check_date::date ,'Partner' source_channel,case when region_name in ('华南','华东') then 'SE' when region_name in ('华北') then 'NW' else region_name end region_name
,customer_name ,p1.product_name ,p1.product_type ,p1.progress ,rp_total_price*p2.target_currency_value /p2.currency_value as total_amt from p20_pdm.t01_partner_report p1
left join p31_dim.d_currency p2
on p2.currency_cd ='CNY'
and p2.target_currency_cd ='USD'
where check_status_str ='已通过' and customer_name in (select distinct company_name from newsletter_t1) ;
delete from p60_mart.news_letter_contact_info;
insert into p60_mart.news_letter_contact_info
--drop table p60_mart.news_letter_contact_info;
--create table p60_mart.news_letter_contact_info
--as
select * from newsletter_contact_t1
;
delete from p60_mart.news_letter_funnel_info;
insert into p60_mart.news_letter_funnel_info
select * from newsletter_funnel_crm_t1
union all
select * from newsletter_funnel_one_t1
;
delete from p60_mart.news_letter_report_info;
insert into p60_mart.news_letter_report_info
select * from newsletter_funnel_one_t2
\q

View File

@ -98,9 +98,22 @@ params={'my_param':"a_market_Intelligence_newsletter_agi"},
depends_on_past=False,
retries=3,
dag=dag)
dysql_news_letter_info = SSHOperator(
ssh_hook=sshHook,
task_id='dysql_news_letter_info',
command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} ',
params={'my_param':"dysql_news_letter_info"},
depends_on_past=False,
retries=3,
dag=dag)
agents_market_summar >> tk_product_prefer
agents_market_summar >> batch_gen_crawer
agents_recuriments >> batch_gen_recruiment
batch_gen_crawer >> a_market_Intelligence_newsletter
a_market_Intelligence_newsletter >> tk_product_prefer
tk_product_prefer >> task_failed
tk_product_prefer >> dysql_news_letter_info
dysql_news_letter_info >> task_failed