ApacheGriffin核心源码measure之DSL转换SQL

ApacheGriffin核⼼源码measure之DSL转换SQL
⽂章⽬录
⼀简介
在measuer源码中,BatchDQApp执⾏run⽅法时(这⾥以batch数据处理hive源数据库为例),创建数据检测job时,实际上是通过客户配置的DSL的语法在代码中转换成对⼀个的sql执⾏
// 创建数据检测对⽐job信息
// build job
val dqJob = DQJobBuilder.buildDQJob(dqContext, EvaluateRule)
进⼊到DQJobBuilder的buildDQJob⽅法
def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam]): DQJob = {
// build steps by datasources
val dsSteps = context.dataSources.flatMap { dataSource =>
DQStepBuilder.buildStepOptByDataSourceParam(context, dataSource.dsParam)
}
// build steps by rules
/**
* SeqDQStep(List(SparkSqlTransformStep(__missRecords,SELECT `source`.* FROM `source` LEFT JOIN `target` ON coalesce(`source`.`user_id`, '') = coalesce(`target`.`user_id`, '') AND upper(`source`.`first_name`) = upper(`target`.`first_name`) AND coalesce(`source`.`last_name`, '') = coalesce(`target`. `last_name`, '') AND coalesce(`source`.`address`, '') = coalesce(`target`.`address`, '') AND coalesce(`source`.`email`, '') = coalesce(`target`.`email`, '') AND  coalesce(`source`.`phone`, '') = coalesce(`target`.`phone`, '') AND coalesce(`source`.`post_code`, '') = coalesce(`target`.`post_code`, '') WHERE (NOT (`s ource`.`user_id` IS NULL AND `source`.`first_name` IS NULL AND `source`.`last_name` IS NULL AND `source`.`address` IS NULL AND `source`.`email` I S NULL AND `source`.`phone` IS NULL AND `source`.`post_code` IS NULL)) AND (`target`.`user_id` IS NULL AND `target`.`first_name` IS NULL AND `tar get`.`last_name` IS NULL AND `target`.`address` IS NULL AND `target`.`email` IS NULL AND `target`.`phone` IS NULL A
ND `target`.`post_code` IS NULL) ,Map(),true), SparkSqlTransformStep(__missCount,SELECT COUNT(*) AS `miss` FROM `__missRecords`,Map(),false), SparkSqlTransformStep(__totalC ount,SELECT COUNT(*) AS `total` FROM `source`,Map(),false), SparkSqlTransformStep(accu,
al AS `total`,
A.miss AS `miss`,
(A.total - A.miss) AS `matched`,
coalesce( (A.total - A.miss) / A.total, 1.0) AS `matchedFraction`
FROM (
SELECT `__totalCount`.`total` AS total,
coalesce(`__missCount`.`miss`, 0) AS miss
FROM `__totalCount` LEFT JOIN `__missCount`
) AS A
,Map(),false), MetricWriteStep(accu,accu,DefaultFlattenType,None), RecordWriteStep(__missRecords,__missRecords,None,None)))
*/
val ruleSteps = ruleParams.flatMap { ruleParam =>
DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)
}
// metric flush step
val metricFlushStep = MetricFlushStep()
/**
* ++ ⽤于连接两个集合
* :+ ⽤于在集合尾部追加集合
* +: ⽤于在集合头部追加集合
*/
DQJob(dsSteps ++ ruleSteps :+ metricFlushStep)
}
执⾏
DQStepBuilder.buildStepOptByRuleParam(context, ruleParam)
最终进⼊到GriffinDslDQStepBuilder的buildSteps⽅法
def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
val name = OutDfName())
val rule = Rule
val dqType = DqType
try {
val result = parser.parseRule(rule, dqType)
if (result.successful) {
val expr =
val expr2DQSteps = Expr2DQSteps(context, expr, placeOutDfName(name))
} else {
warn(s"parse rule [ ${rule} ] fails: \n${result}")
Nil
}
} catch {
case e: Throwable =>
error(s"generate rule plan ${name} fails: ${e.getMessage}", e)
Nil
}
}
⼆ Apache Griffin DSL
官⽅⽂档:
Griffin DSL 是⽤于 DQ 测量的,是⼀个类SQL语法,⽤来描述 DQ 域问题。
实际上,在 Griffin 中,我们得到了 Griffin DSL 规则,将它们转换为 Spark-SQL 规则,以便在 Spark-SQL 引擎中进⾏计算。
在 DQ 域中,有多个维度,我们需要以不同的⽅式翻译它们。
准确性【⼀致性】(Accuracy)
对于准确性,我们需要得到源和⽬标之间的匹配计数,规则描述了数据源之间的映射关系。Griffin 需要将 DSL 规则转换为多个 SQL 规则。
例如,DSL 规则是“ source.id = target.id and source.name = target.name ",表⽰精度的匹配条件。翻译后,SQL 规则如下:
从源获取丢失的项⽬:SELECT source.* FROM source LEFT JOIN target ON
coalesce(source.id, ‘’) = coalesce(target.id, ‘’) and
coalesce(source.name, ‘’) = coalesce(target.name, ‘’) WHERE (NOT
(source.id IS NULL AND source.name IS NULL)) AND (target.id IS NULL
AND target.name IS NULL), save as table miss_items
获取丢失的总数:SELECT COUNT(*) AS miss FROM miss_items, save as table
miss_count.
从源表中获取统计的总数:SELECT COUNT(*) AS total FROM source, save as
tabletotal_count.
获取精度度量:SELECT miss_count.miss AS miss, al AS total,
(al - miss_count.miss) AS matched FROM miss_count FULL
JOIN total_count, save as table accuracy.
解析后,将数据保存到tableaccuracy中
统计分析(Profiling)
对于统计的分析,请求始终是数据的聚合函数,该规则主要与 SQL 相同,但只是⽀持select,from,where,group-by,having,order-by,limit⼦句,它可以描述⼤多数剖析请求。如果有复杂的请求,可以直接使⽤ SQL 规则来描述它。
例如,DSL 规则是sourcetry, unt(), source.age.max() group by sourcetry"表⽰分析请求。翻译后,SQL 规则如下:
SELECT sourcetry, count(source), max(source.age) FROM source GROUP
BY sourcetry,save as profiling.
翻译之后,度量将被保存在表中profiling.
唯⼀性【重复性】(Distinctness)
对于唯⼀性,或称为重复,是为了出重复的数据项,并将项⽬计数组按重复的时间卷积。
例如,DSL 规则是 “name,age”,它表⽰重复的请求,在本例中,源和⽬标是相同的数据集。翻译后,SQL 规则如下:数据集总和:SELECT COUNT(*) AS total FROM source, save as table total_count.
根据指定域分组:SELECT name, age, (COUNT(*) - 1) AS dup, TRUE AS dist FROM source GROUP BY name, age, save as table dup_count.
不同度量:SELECT COUNT(*) AS dist_count FROM dup_count WHERE dist, save as table distinct_metric.
源数据关联不同度量的表: SELECT source.*, dup_count.dup AS dup, dup_count.dist AS
dist FROM source LEFT JOIN dup_count ON source.name = dup_count.name
AND source.age = dup_count.age, save as table dist_joined.
增加⾏数: SELECT *, ROW_NUMBER() OVER (DISTRIBUTE BY name, age SORT BY
dist) row_num FROM dist_joined, save as table row_numbered.
重复记录:SELECT name, age, dup FROM row_numbered WHERE NOT dist OR row_num > 1, save as table dup_records.
重复指标:SELECT name, age, dup, COUNT(*) AS num FROM dup_records GROUP BY name, age, dup, save as table dup_metric.
完整性(Completeness)
对于完整性,需要检查NULL。如果您测量的列为NULL,则它们是不完整的。
获取源数据的总和:SELECT COUNT(*) AS total FROM source, save as table total_count.
不完整指标:SELECT count(*) as incomplete FROM source WHERE NOT (id IS NOT NULL), save as table
incomplete_count
代码转换完整指标:complete metric: SELECT (al - incomplete_count.incomplete) AS complete FROM source LEFT JOIN incomplete_count, save as table complete_count
时效性(Timeliness)
对于及时性,是度量每个项⽬的延迟,并获得延迟的统计信息。
例如,DSL 规则是 “ts,out _ ts”,第⼀列表⽰项的输⼊时间,第⼆列表⽰项的输出时间,如果不设置,“_ _ tmst” 将是默认的输出时间列。翻译后,SQL 规则如下:
获取输⼊和输出时间列:SELECT *, ts AS _bts, out_ts AS _ets FROM source, save as table origin_time
获取输⼊时间和输出时间的时间差:get latency: SELECT *, (_ets - _bts) AS latency FROM origin_time, save as table lat.
获取时效性指标:SELECT CAST(AVG(latency) AS BIGINT) AS avg, MAX(latency) AS max, MIN(latency) AS min FROM lat, save as table time_metric.

本文发布于:2024-09-23 05:26:43,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/3/378233.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   时间   规则   获取   集合   需要   输出   描述
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议