add workflow 荟聚API_2,dev

This commit is contained in:
root 2024-06-18 15:51:58 +08:00
parent f3e6e3883e
commit 4eb1c54862
7 changed files with 3043 additions and 1 deletions

View File

@ -0,0 +1,51 @@
#!/usr/bin/python
# -*- encoding=utf-8 -*-
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
import json
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
sshHook = SSHHook(ssh_conn_id ='ssh_85_air')
default_args = {
'owner': 'sewp_dev',
'email': [],
'start_date': datetime(2022, 9, 12),
'depends_on_past': False,
'retries': 6,
'retry_delay': timedelta(minutes=10),
}
dag = DAG('Dag_Sql_dq_result', default_args=default_args,
schedule_interval="${cronExp}",
catchup=False,
dagrun_timeout=timedelta(minutes=160),
max_active_runs=3)
Sql_dq_result = SSHOperator(
ssh_hook=sshHook,
task_id='Sql_dq_result',
command='python /data/airflow/etl/PDM/{{params.my_param}}.py {{ ds_nodash }} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"sql_dq_result"},
depends_on_past=False,
retries=3,
dag=dag)
task_failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="dq检查_failed",
to=["${etlEmail}"],
cc=[""],
subject="dq检查_failed",
html_content='<h3>dq检查_failed" </h3>')
Sql_dq_result >> task_failed

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,51 @@
#!/usr/bin/python
# -*- encoding=utf-8 -*-
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
import json
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
sshHook = SSHHook(ssh_conn_id ='ssh_85_air')
default_args = {
'owner': 'sewp_dev',
'email': [],
'start_date': datetime(2022, 9, 12),
'depends_on_past': False,
'retries': 6,
'retry_delay': timedelta(minutes=10),
}
dag = DAG('Dag_Sql_dq_result_detail_cust_info', default_args=default_args,
schedule_interval="${cronExp}",
catchup=False,
dagrun_timeout=timedelta(minutes=160),
max_active_runs=3)
Sql_dq_result_detail_cust_info = SSHOperator(
ssh_hook=sshHook,
task_id='Sql_dq_result_detail_cust_info',
command='python /data/airflow/etl/PDM/{{params.my_param}}.py {{ ds_nodash }} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"sql_dq_result_detail_cust_info"},
depends_on_past=False,
retries=3,
dag=dag)
task_failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="客户dq检查结果明细_failed",
to=["${etlEmail}"],
cc=[""],
subject="客户dq检查结果明细_failed",
html_content='<h3>客户dq检查结果明细_failed" </h3>')
Sql_dq_result_detail_cust_info >> task_failed

View File

@ -0,0 +1,638 @@
/********************************************************************************************/
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons1_1'
AND CHECK_TYPE_CD = 'Cons1'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons1_1' AS Check_Id
,'Cons1' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and mobile_phone_availability = '1';
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons1_2'
AND CHECK_TYPE_CD = 'Cons1'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons1_2' AS Check_Id
,'Cons1' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and email_availability = '1';
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons1_3'
AND CHECK_TYPE_CD = 'Cons1'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons1_3' AS Check_Id
,'Cons1' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and ((full_name ~ '^[\u4e00-\u9fa5]{1}\.[\u4e00-\u9fa5]{1,2}$')
OR
(full_name ~ '^[\u4e00-\u9fa5]{2,4}\.$'));
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons1_4'
AND CHECK_TYPE_CD = 'Cons1'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons1_4' AS Check_Id
,'Cons1' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and (company ~ '^[\u4e00-\u9fa5]+公司$'
OR company ~ '^[\u4e00-\u9fa5]+集团$'
OR company ~ '^[\u4e00-\u9fa5]+大学$') AND LENGTH(company) >= 4;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Integ10_1'
AND CHECK_TYPE_CD = 'Integ10'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Integ10_1' AS Check_Id
,'Integ10' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and (mobile = ''
or mobile is NULL);
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Integ10_2'
AND CHECK_TYPE_CD = 'Integ10'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Integ10_2' AS Check_Id
,'Integ10' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and (email = ''
or email is NULL);
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Integ10_3'
AND CHECK_TYPE_CD = 'Integ10'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Integ10_3' AS Check_Id
,'Integ10' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and (full_name = ''
or full_name is NULL
or full_name = '0');
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Integ10_4'
AND CHECK_TYPE_CD = 'Integ10'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Integ10_4' AS Check_Id
,'Integ10' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and (company = ''
or company is NULL);
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Integ10_5'
AND CHECK_TYPE_CD = 'Integ10'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Integ10_5' AS Check_Id
,'Integ10' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and (prov_name = ''
or prov_name is NULL);
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Integ10_6'
AND CHECK_TYPE_CD = 'Integ10'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Integ10_6' AS Check_Id
,'Integ10' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and (city_name = ''
or city_name is NULL);
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Integ10_7'
AND CHECK_TYPE_CD = 'Integ10'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Integ10_7' AS Check_Id
,'Integ10' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y'
and (industry = ''
or industry is NULL);
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_1'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_1' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y' and contact_channel like 'CRM%'
and (company <> ''
and account <> ''
AND company = account);
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_2'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_2' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
WHERE active_ind = '3Y' and contact_channel like 'CRM%'
and (company <> account);
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_5'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_5' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
left join (
SELECT
P1.crm_contact_account,
P1.company_name AS crm_company,
P1.account_name AS crm_account,
P1.contact_id AS CRM_CONTACT_ID,
P2.scrm_leads_id,
P2.company_name AS scrm_company,
P2.contact_id AS SCRM_CONTACT_ID
FROM (
SELECT
P1.crm_contact_account,
P1.company_name,
P1.account_name,
P2.contact_id
FROM
p30_common.d_crm_contact P1
LEFT JOIN
p30_common.v_cust_contact_mapping P2
ON
P1.crm_contact_account = P2.contact
) AS P1
LEFT JOIN (
SELECT
P1.scrm_leads_id,
P1.company_name,
P2.contact_id
FROM
p30_common.d_scrm_contact P1
LEFT JOIN
p30_common.v_cust_contact_mapping P2
ON
P1.scrm_leads_id = P2.contact
) AS P2
ON
P1.contact_id = P2.contact_id
WHERE
P1.company_name = P2.company_name
AND
P1.account_name = P2.company_name
and P1.company_name <> ''
and P1.account_name <> ''
and P2.company_name <> ''
) p2
on p1.contact_id=p2.crm_contact_id
WHERE p1.active_ind = '3Y'
and (p2.crm_contact_id is not null) ;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_6'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_6' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'v_cust_all_info' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,row_to_json(p1)
,'' AS Remark
FROM p60_mart.v_cust_all_info p1
left join (
SELECT
P1.crm_contact_account,
P1.company_name AS crm_company,
P1.account_name AS crm_account,
P1.contact_id AS CRM_CONTACT_ID,
P2.scrm_leads_id,
P2.company_name AS scrm_company,
P2.contact_id AS SCRM_CONTACT_ID
FROM (
SELECT
P1.crm_contact_account,
P1.company_name,
P1.account_name,
P2.contact_id
FROM
p30_common.d_crm_contact P1
LEFT JOIN
p30_common.v_cust_contact_mapping P2
ON
P1.crm_contact_account = P2.contact
) AS P1
LEFT JOIN (
SELECT
P1.scrm_leads_id,
P1.company_name,
P2.contact_id
FROM
p30_common.d_scrm_contact P1
LEFT JOIN
p30_common.v_cust_contact_mapping P2
ON
P1.scrm_leads_id = P2.contact
) AS P2
ON
P1.contact_id = P2.contact_id
WHERE
P1.company_name <> P2.company_name
AND
P1.account_name <> P2.company_name
) p2
on p1.contact_id=p2.crm_contact_id
WHERE p1.active_ind = '3Y'
and (p2.crm_contact_id is not null);
\q

View File

@ -0,0 +1,51 @@
#!/usr/bin/python
# -*- encoding=utf-8 -*-
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
import json
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
sshHook = SSHHook(ssh_conn_id ='ssh_85_air')
default_args = {
'owner': 'sewp_dev',
'email': [],
'start_date': datetime(2022, 9, 12),
'depends_on_past': False,
'retries': 6,
'retry_delay': timedelta(minutes=10),
}
dag = DAG('Dag_Sql_dq_check_contact', default_args=default_args,
schedule_interval="${cronExp}",
catchup=False,
dagrun_timeout=timedelta(minutes=160),
max_active_runs=3)
Sql_dq_check_contact = SSHOperator(
ssh_hook=sshHook,
task_id='Sql_dq_check_contact',
command='python /data/airflow/etl/PDM/{{params.my_param}}.py {{ ds_nodash }} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"sql_dq_check_contact"},
depends_on_past=False,
retries=3,
dag=dag)
task_failed = EmailOperator (
dag=dag,
trigger_rule=TriggerRule.ONE_FAILED,
task_id="联系信息dq检查_failed",
to=["${etlEmail}"],
cc=[""],
subject="联系信息dq检查_failed",
html_content='<h3>联系信息dq检查_failed" </h3>')
Sql_dq_check_contact >> task_failed

View File

@ -0,0 +1,797 @@
/********************************************************************************************/
/*******Main Section**************************************************************************/
\set ON_ERROR_STOP on
\set AUTOCOMMIT on
\timing on
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_7'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_7' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p1.crm_contact_account
,p2.scrm_leads_id
,'' livechat_leads_id
,p1.email CRM_EMAIL
,p2.email SCRM_EMAIL
,'' LIVECHAT_EMAIL
,p1.mobile_phone CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,'' LIVECHAT_MOBILE_PHONE
FROM p30_common.d_crm_contact p1
left join p30_common.d_scrm_contact p2
on p1.mobile_phone=p2.mobile_phone
and p2.mobile_phone<> ''
where p2.mobile_phone is not null
and (p1.email <> p2.email ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_8'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_8' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p1.crm_contact_account
,p2.scrm_leads_id
,'' livechat_leads_id
,p1.email CRM_EMAIL
,p2.email SCRM_EMAIL
,'' LIVECHAT_EMAIL
,p1.mobile_phone CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,'' LIVECHAT_MOBILE_PHONE
FROM p30_common.d_crm_contact p1
left join p30_common.d_scrm_contact p2
on p1.email=p2.email
and p2.email<> ''
where p2.email is not null
and (p1.mobile_phone <> p2.mobile_phone ) )P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_9'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_9' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p1.crm_contact_account
,p2.scrm_leads_id
,'' livechat_leads_id
,p1.email CRM_EMAIL
,p2.email SCRM_EMAIL
,'' LIVECHAT_EMAIL
,p1.mobile_phone CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,'' LIVECHAT_MOBILE_PHONE
FROM p30_common.d_crm_contact p1
left join p30_common.d_scrm_contact p2
on p1.mobile_phone=p2.mobile_phone
and p2.mobile_phone<> ''
and p1.email_availability = '1'
and p2.email_availability = '1'
where p2.mobile_phone is not null
and (p1.email <> p2.email ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_10'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_10' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p1.crm_contact_account
,p2.scrm_leads_id
,'' livechat_leads_id
,p1.email CRM_EMAIL
,p2.email SCRM_EMAIL
,'' LIVECHAT_EMAIL
,p1.mobile_phone CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,'' LIVECHAT_MOBILE_PHONE
FROM p30_common.d_crm_contact p1
left join p30_common.d_scrm_contact p2
on p1.email=p2.email
and p2.email<> ''
and p1.mobile_phone_availability = '1'
and p2.mobile_phone_availability = '1'
where p2.email is not null
and (p1.mobile_phone <> p2.mobile_phone ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_11'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_11' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p1.crm_contact_account
,p2.scrm_leads_id
,p3.livechat_leads_id
,p1.email CRM_EMAIL
,p2.email SCRM_EMAIL
,P3.email LIVECHAT_EMAIL
,p1.mobile_phone CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,P3.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_crm_contact p1
left join p30_common.d_scrm_contact p2
on p1.mobile_phone=p2.mobile_phone
and p2.mobile_phone<> ''
LEFT JOIN p30_common.d_livechat_contact p3
on p1.mobile_phone = p3.mobile_phone
AND p2.mobile_phone = p3.mobile_phone
where p2.mobile_phone is not null
and (p1.email <> p2.email
or p1.email <> p3.email
or p2.email <> p3.email ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_12'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_12' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p1.crm_contact_account
,p2.scrm_leads_id
,p3.livechat_leads_id
,p1.email CRM_EMAIL
,p2.email SCRM_EMAIL
,P3.email LIVECHAT_EMAIL
,p1.mobile_phone CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,P3.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_crm_contact p1
left join p30_common.d_scrm_contact p2
on p1.email=p2.email
and p2.email<> ''
LEFT JOIN p30_common.d_livechat_contact p3
on p1.email = p3.email
AND p2.email = p3.email
where p2.email is not null
and (p1.mobile_phone <> p2.mobile_phone
or p1.mobile_phone <> p3.mobile_phone
or p2.mobile_phone <> p3.mobile_phone))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_13'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_13' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p1.crm_contact_account
,p2.scrm_leads_id
,p3.livechat_leads_id
,p1.email CRM_EMAIL
,p2.email SCRM_EMAIL
,P3.email LIVECHAT_EMAIL
,p1.mobile_phone CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,P3.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_crm_contact p1
left join p30_common.d_scrm_contact p2
on p1.mobile_phone=p2.mobile_phone
and p2.mobile_phone<> ''
LEFT JOIN p30_common.d_livechat_contact p3
on p1.mobile_phone = p3.mobile_phone
AND p2.mobile_phone = p3.mobile_phone
where p2.mobile_phone is not null
and p1.email_availability = '1'
and p2.email_availability = '1'
and p3.email_availability = '1'
and (p1.email <> p2.email
or p1.email <> p3.email
or p2.email <> p3.email ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_14'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_14' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p1.crm_contact_account
,p2.scrm_leads_id
,p3.livechat_leads_id
,p1.email CRM_EMAIL
,p2.email SCRM_EMAIL
,P3.email LIVECHAT_EMAIL
,p1.mobile_phone CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,P3.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_crm_contact p1
left join p30_common.d_scrm_contact p2
on p1.email=p2.email
and p2.email<> ''
LEFT JOIN p30_common.d_livechat_contact p3
on p1.email = p3.email
AND p2.email = p3.email
where p2.email is not null
and p1.mobile_phone_availability = '1'
and p2.mobile_phone_availability = '1'
and p3.mobile_phone_availability = '1'
and (p1.mobile_phone <> p2.mobile_phone
or p1.mobile_phone <> p3.mobile_phone
or p2.mobile_phone <> p3.mobile_phone))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_15'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_15' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT '' crm_contact_account
,p2.scrm_leads_id
,p1.livechat_leads_id
,'' CRM_EMAIL
,p2.email SCRM_EMAIL
,P1.email LIVECHAT_EMAIL
,'' CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,P1.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_livechat_contact p1
left join p30_common.d_scrm_contact p2
on p1.mobile_phone=p2.mobile_phone
and p2.mobile_phone<> ''
where p2.mobile_phone is not null
and (p1.email <> p2.email ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_16'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_16' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT '' crm_contact_account
,p2.scrm_leads_id
,p1.livechat_leads_id
,'' CRM_EMAIL
,p2.email SCRM_EMAIL
,P1.email LIVECHAT_EMAIL
,'' CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,P1.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_livechat_contact p1
left join p30_common.d_scrm_contact p2
on p1.email=p2.email
and p2.email<> ''
where p2.email is not null
and (p1.mobile_phone <> p2.mobile_phone ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_17'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_17' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT '' crm_contact_account
,p2.scrm_leads_id
,p1.livechat_leads_id
,'' CRM_EMAIL
,p2.email SCRM_EMAIL
,P1.email LIVECHAT_EMAIL
,'' CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,P1.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_livechat_contact p1
left join p30_common.d_scrm_contact p2
on p1.mobile_phone=p2.mobile_phone
and p2.mobile_phone<> ''
and p1.email_availability = '1'
and p2.email_availability = '1'
where p2.mobile_phone is not null
and (p1.email <> p2.email ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_18'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_18' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT '' crm_contact_account
,p2.scrm_leads_id
,p1.livechat_leads_id
,'' CRM_EMAIL
,p2.email SCRM_EMAIL
,P1.email LIVECHAT_EMAIL
,'' CRM_MOBILE_PHONE
,p2.mobile_phone SCRM_MOBILE_PHONE
,P1.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_livechat_contact p1
left join p30_common.d_scrm_contact p2
on p1.email=p2.email
and p2.email<> ''
and p1.mobile_phone_availability = '1'
and p2.mobile_phone_availability = '1'
where p2.email is not null
and (p1.mobile_phone <> p2.mobile_phone ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_19'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_19' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p2.crm_contact_account
,'' scrm_leads_id
,p1.livechat_leads_id
,p2.email CRM_EMAIL
,'' SCRM_EMAIL
,P1.email LIVECHAT_EMAIL
,p2.mobile_phone CRM_MOBILE_PHONE
,'' SCRM_MOBILE_PHONE
,P1.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_livechat_contact p1
left join p30_common.d_crm_contact p2
on p1.mobile_phone=p2.mobile_phone
and p2.mobile_phone<> ''
where p2.mobile_phone is not null
and (p1.email <> p2.email ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_20'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_20' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p2.crm_contact_account
,'' scrm_leads_id
,p1.livechat_leads_id
,p2.email CRM_EMAIL
,'' SCRM_EMAIL
,P1.email LIVECHAT_EMAIL
,p2.mobile_phone CRM_MOBILE_PHONE
,'' SCRM_MOBILE_PHONE
,P1.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_livechat_contact p1
left join p30_common.d_crm_contact p2
on p1.email=p2.email
and p2.email<> ''
where p2.email is not null
and (p1.mobile_phone <> p2.mobile_phone ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_21'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_21' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p2.crm_contact_account
,'' scrm_leads_id
,p1.livechat_leads_id
,p2.email CRM_EMAIL
,'' SCRM_EMAIL
,P1.email LIVECHAT_EMAIL
,p2.mobile_phone CRM_MOBILE_PHONE
,'' SCRM_MOBILE_PHONE
,P1.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_livechat_contact p1
left join p30_common.d_crm_contact p2
on p1.mobile_phone=p2.mobile_phone
and p2.mobile_phone<> ''
and p1.email_availability = '1'
and p2.email_availability = '1'
where p2.mobile_phone is not null
and (p1.email <> p2.email ))P1;
DELETE FROM p51_dqc.DQ_CHECK_RESULT_DETAIL
WHERE CHECK_ID ='Cons22_22'
AND CHECK_TYPE_CD = 'Cons22'
AND Check_Run_Date = CURRENT_DATE;
INSERT INTO p51_dqc.DQ_CHECK_RESULT_DETAIL(
CHECK_ID
,CHECK_TYPE_CD
,CHECK_RUN_DATE
,SRC_SYS_BRIEF_NAME
,SRC_DB_NAME
,SRC_SCHEMA_NAME
,SRC_TABLE_NAME
,SRC_COLUMN_NAME
,SRC_TABLE_EXCPT_REC
,DATA_DETAIL
,REMARK
)
SELECT 'Cons22_22' AS Check_Id
,'Cons22' AS Check_Type_Cd
,CURRENT_DATE AS Check_Run_Date
,'TDVM' AS Src_Sys_Brief_Name
,'p51_dqc' AS Src_DB_Name
,'' AS Src_Schema_Name
,'contact' AS Src_Table_Name
,'dismm' AS Src_Column_Name
,1 AS Src_Table_Total_Rec
,ROW_TO_JSON(P1)
,'' AS Remark
FROM ( SELECT p2.crm_contact_account
,'' scrm_leads_id
,p1.livechat_leads_id
,p2.email CRM_EMAIL
,'' SCRM_EMAIL
,P1.email LIVECHAT_EMAIL
,p2.mobile_phone CRM_MOBILE_PHONE
,'' SCRM_MOBILE_PHONE
,P1.mobile_phone LIVECHAT_MOBILE_PHONE
FROM p30_common.d_livechat_contact p1
left join p30_common.d_crm_contact p2
on p1.email=p2.email
and p2.email<> ''
and p1.mobile_phone_availability = '1'
and p2.mobile_phone_availability = '1'
where p2.email is not null
and (p1.mobile_phone <> p2.mobile_phone ))P1;
\q

View File

@ -246,6 +246,42 @@ params={'my_param':"cust_enagement_records_agi"},
depends_on_past=False,
retries=3,
dag=dag)
dysql_dq_result_detail_cust_info = SSHOperator(
ssh_hook=sshHook,
task_id='dysql_dq_result_detail_cust_info',
command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"dysql_dq_result_detail_cust_info"},
depends_on_past=False,
retries=3,
dag=dag)
dysql_dq_result = SSHOperator(
ssh_hook=sshHook,
task_id='dysql_dq_result',
command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"dysql_dq_result"},
depends_on_past=False,
retries=3,
dag=dag)
dysql_dq_check_contact = SSHOperator(
ssh_hook=sshHook,
task_id='dysql_dq_check_contact',
command='/data/airflow/etl/MART/run_psql.sh {{ ds_nodash }} {{params.my_param}} >>/data/airflow/logs/run_tpt_{{ds_nodash}}.log 2>&1 ',
params={'my_param':"dysql_dq_check_contact"},
depends_on_past=False,
retries=3,
dag=dag)
customer_events_3292 >> customer_events_feign
customer_events_open_content_page_9684 >> custom_events_open_content_page_feign
customer_event_meta_2268 >> customer_event_meta_feign
@ -271,4 +307,7 @@ cust_all_info >> data_source_update
cust_all_info >> cust_leads_detail
cust_leads_detail >> cust_enagement_records
cust_enagement_records >> data_source_update
data_source_update >> task_failed
cust_enagement_records >> dysql_dq_result
dysql_dq_result >> dysql_dq_check_contact
dysql_dq_check_contact >> dysql_dq_result_detail_cust_info
dysql_dq_result_detail_cust_info >> task_failed