diff --git a/dev/workflow/TK_Cust/agents_market_newsletter/news_letter/dysql_news_letter_info.sql b/dev/workflow/TK_Cust/agents_market_newsletter/news_letter/dysql_news_letter_info.sql new file mode 100644 index 0000000..d5e7183 --- /dev/null +++ b/dev/workflow/TK_Cust/agents_market_newsletter/news_letter/dysql_news_letter_info.sql @@ -0,0 +1,116 @@ +/********************************************************************************************/ +/*******Main Section**************************************************************************/ +\set ON_ERROR_STOP on +\set AUTOCOMMIT on +\timing on + +drop table if exists newsletter_t1; +create temporary table newsletter_t1 +as +select distinct on (p1.company_name,p1.newsletter_title) p1.company_name,p2.saler_name,p1.whitelist_flag,p1.newsletter_title +,p1.summary ,p1.tek_products ,p1.tk_relevance,p1.industry ,p1.prov_name ,p1.city_name ,p1.district,p1.source_url,p1.business_type,p1.create_tm +from (select case when p1.company_name_ai<>'' then p1.company_name_ai else p1.company_name end company_name +,case when p3.account_name is not null then '1' else '0' end whitelist_flag,p1.newsletter_title +,p1.summary ,p1.tek_products ,p1.tk_relevance,p2.industry ,p2.prov_name ,p2.city_name ,coalesce(p3.district,p1.district )district ,p1.source_url,p1.business_type,p1.create_tm +from p30_common.a_market_intelligence_newsletter p1 +left join p30_common.d_account_info p2 +on p1.company_name =p2."name" +or p1.company_name_ai =p2."name" +left join p30_common.white_list p3 +on (p1.company_name =p3.account_name +or p1.company_name_ai =p3.account_name) +and p3.as_of_date='2025-11-14' +where p2."name" is not null)p1 +left join p20_pdm.t00_china_city_pnum p2 +on p2.prov_name =p1.prov_name +and p2.city_name like p1.city_name||'%' +and ((p2.district like (substring(p1.district,1,2)||'%') and p1.district<>'') +or p2.district is null +or (p1.district ='' and p2.district like '%(客户未提供具体区域%') +); + + +drop table if exists newsletter_contact_t1; +create temporary table newsletter_contact_t1 +as +select distinct on (contact_id) cci.contact_id,contact,full_name,mobile,email,wechat,company,'' job_title,p2.active_time from p30_common.cust_contact_info cci +left join p30_common.v_cust_active_all p2 +on cci.contact_id =p2.contact_id +where company in (select distinct company_name from newsletter_t1); + +drop table if exists newsletter_funnel_one_t1; +create temporary table newsletter_funnel_one_t1 +as +select p1.record_id ,p1.apply_date ,contact_name ,contact_phone ,check_date::date ,'Partner' source_channel,case when region_name in ('华南','华东') then 'SE' when region_name in ('华北') then 'NW' else region_name end region_name + ,customer_name ,p1.product_name ,p1.product_type ,p1.progress ,rp_total_price*p2.target_currency_value /p2.currency_value as total_amt from p20_pdm.t01_partner_report p1 +left join p31_dim.d_currency p2 +on p2.currency_cd ='CNY' +and p2.target_currency_cd ='USD' +where check_status_str ='已通过' and customer_name in (select distinct company_name from newsletter_t1) ; + + +drop table if exists newsletter_funnel_crm_t1; +create temporary table newsletter_funnel_crm_t1 +as +select distinct on (p1.tek_opportunity_id) p1.tek_opportunity_id ,p1.create_time ::date create_date,p5.last_name||p5.first_name contact_name,p5.mobile_phone contact_phone,p1.as_of_date::date,'CRM' source_channel,case when upper(p1.area_id) like '%SOUTH%' then 'SE' when upper(p1.area_id) like '%NORTH%' then 'NW' else p1.area_id end area_id,coalesce(p2.name,p1.parent_account_name) account_name,p1.product_name ,split_part(product_class, ':', 1) product_type,p1.state,p1.total_amount/p4.currency_value*p4.target_currency_value total_amount +from p30_common.crm_opportunity_weekly p1 +left join p20_pdm.t01_crm_account p2 +on p1.account_number =p2.accountnumber +left join p31_dim.d_currency p4 +on p1.currency_name=p4.currency_name +and p4.target_currency_cd ='USD' +left join p20_pdm.t01_crm_contact p5 +on p1.crm_contact_number =p5.crm_contact_account +where (p1.product_class like '2025%' or p1.product_class like '2035%' or p1.product_class like '2610%' or p1.product_class like '2026%' or p1.product_class like '2036%') and coalesce(p2.name,p1.parent_account_name) in (select distinct company_name from newsletter_t1) +order by p1.tek_opportunity_id,status_last_update desc,p1.as_of_date asc; + +delete from p60_mart.news_letter_info; +INSERT INTO p60_mart.news_letter_info +(account_name, saler_name, whitelist_flag, newsletter_title, summary, key_products, tek_relevance, industry, prov_name, city_name, district, contact_qty, total_funnel, report_amt,source_url,business_type,create_tm) +select distinct on (p1.company_name,p1.source_url) p1.company_name account_name, coalesce(saler_name,''), whitelist_flag, newsletter_title, summary +, case when p5.data_json::text like '{%' then replace(replace(replace(p5.data_json->>'主营产品'::text,'"',''),'[',''),']','') +when p5.data_json::text like '[%' then replace(replace(replace(p5.data_json::text,'"',''),'[',''),']','') else '' end +,coalesce(p5.recomend_product_series::text,'') tek_relevance, industry, coalesce(prov_name,''), coalesce(city_name), district,coalesce(p2.contact_qty,0) contact_qty, coalesce(total_amt,0)+coalesce(total_amount,0) total_funnel,coalesce(total_amt,0) report_amt,p1.source_url,p1.business_type,p1.create_tm from newsletter_t1 p1 +left join (select company,count(*) contact_qty from newsletter_contact_t1 group by 1 ) p2 +on p1.company_name=p2.company +left join (select customer_name,sum(total_amt) total_amt from newsletter_funnel_one_t1 group by 1) p3 +on p1.company_name=p3.customer_name +left join (select account_name,sum(total_amount) total_amount from newsletter_funnel_crm_t1 group by 1) p4 +on p1.company_name=p4.account_name +left join p70_ai_intelligence.agent_recommendation_cache p5 +on p1.company_name=p5.company_name +order by p1.company_name,p1.source_url,p1.create_tm desc,p5.created_at desc; + +drop table if exists newsletter_funnel_one_t2; +create temporary table newsletter_funnel_one_t2 +as +select p1.record_id ,p1.dealer_name ,p1.apply_admin_name ,p1.apply_date ,contact_name ,contact_phone ,check_date::date ,'Partner' source_channel,case when region_name in ('华南','华东') then 'SE' when region_name in ('华北') then 'NW' else region_name end region_name + ,customer_name ,p1.product_name ,p1.product_type ,p1.progress ,rp_total_price*p2.target_currency_value /p2.currency_value as total_amt from p20_pdm.t01_partner_report p1 +left join p31_dim.d_currency p2 +on p2.currency_cd ='CNY' +and p2.target_currency_cd ='USD' +where check_status_str ='已通过' and customer_name in (select distinct company_name from newsletter_t1) ; + + +delete from p60_mart.news_letter_contact_info; +insert into p60_mart.news_letter_contact_info +--drop table p60_mart.news_letter_contact_info; +--create table p60_mart.news_letter_contact_info +--as +select * from newsletter_contact_t1 +; + +delete from p60_mart.news_letter_funnel_info; +insert into p60_mart.news_letter_funnel_info +select * from newsletter_funnel_crm_t1 +union all +select * from newsletter_funnel_one_t1 +; +delete from p60_mart.news_letter_report_info; +insert into p60_mart.news_letter_report_info +select * from newsletter_funnel_one_t2 + + + + +\q \ No newline at end of file diff --git a/dev/workflow/TK_Cust/agents_market_newsletter/市场-Agents调度/wf_dag_agents_market_newsletter.py b/dev/workflow/TK_Cust/agents_market_newsletter/市场-Agents调度/wf_dag_agents_market_newsletter.py index cb0b5b7..1857feb 100644 --- a/dev/workflow/TK_Cust/agents_market_newsletter/市场-Agents调度/wf_dag_agents_market_newsletter.py +++ b/dev/workflow/TK_Cust/agents_market_newsletter/市场-Agents调度/wf_dag_agents_market_newsletter.py @@ -98,9 +98,22 @@ params={'my_param':"a_market_Intelligence_newsletter_agi"}, depends_on_past=False, retries=3, dag=dag) + +dysql_news_letter_info = SSHOperator( +ssh_hook=sshHook, +task_id='dysql_news_letter_info', +command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} ', +params={'my_param':"dysql_news_letter_info"}, +depends_on_past=False, +retries=3, +dag=dag) + + + agents_market_summar >> tk_product_prefer agents_market_summar >> batch_gen_crawer agents_recuriments >> batch_gen_recruiment batch_gen_crawer >> a_market_Intelligence_newsletter a_market_Intelligence_newsletter >> tk_product_prefer -tk_product_prefer >> task_failed +tk_product_prefer >> dysql_news_letter_info +dysql_news_letter_info >> task_failed