Spark原生大数据迁移数据校验框架设计与实践

发布时间:2026/6/12 17:39:28
Spark原生大数据迁移数据校验框架设计与实践 1. 项目概述为什么在大数据迁移中数据校验不能只靠“跑通就完事”我做过不下二十个跨平台大数据迁移项目从传统数仓迁到云原生湖仓从 Hive 迁到 Delta Lake从本地 HDFS 迁到对象存储。每次客户最常问的一句话不是“多久能上线”而是“你们怎么保证数据没丢、没乱、没算错”——这句话背后是几十TB甚至PB级数据在传输、转换、落地过程中可能发生的隐性腐化字段截断、时区偏移、空值语义漂移、精度丢失、分区路径错位、小文件爆炸导致统计偏差……这些都不是任务报错能拦住的它们安静地躺在结果表里直到某天财务对账差了37分或者风控模型突然把好人标成高风险。这篇文章讲的就是一个我在真实生产环境反复打磨、迭代了四轮的Apache Spark 数据校验框架它不是简单的 count(*) 对比也不是写几行 SQL 手动抽样检查。它是一套可嵌入 ETL 流程、支持多层级断言、能自动生成差异报告、且与业务语义强绑定的验证体系。核心关键词就是Data Validation Framework、Apache Spark、Big Data Migration Workloads。它专为迁移场景设计——因为迁移不是一次性的“搬箱子”而是一场需要全程监控、逐层确认、责任可追溯的精密手术。适合正在做或即将做大数据平台升级、云迁移、架构重构的工程师、数据平台负责人、质量保障同学。如果你还在用 Excel 手工比对源目表字段、靠肉眼扫日志查异常、或者把校验逻辑硬编码在每个作业末尾那这篇内容就是为你写的——它不教你 Spark 基础语法但会告诉你怎么让 Spark 成为你数据质量的“守门员”而不是“搬运工”。这个框架的底层逻辑很朴素迁移的本质是承诺的兑现。你向业务方承诺“所有订单记录完整迁移”那就得定义清楚“完整”是什么——是行数一致主键无缺失金额总和误差0.001%还是关键业务字段如 order_status、payment_method分布比例偏差2%这个框架强制你把模糊的“完整”翻译成可执行、可量化、可回溯的校验规则。它不替代单元测试也不取代数据血缘治理但它是在迁移流水线最后一道闸口上那个必须亮起绿灯才能放行的信号灯。我见过太多项目卡在“数据到底对不对”这个环节上反复回滚、加班核对、互相甩锅。这套框架的目的就是把“我觉得差不多”变成“系统证明它达标”。2. 整体设计思路为什么不用现成工具而要自己搭一套校验框架2.1 现有方案的三大硬伤通用性、语义性、可嵌入性全都不够刚接手第一个大型迁移项目时我也试过直接用现成方案。比如用 Great ExpectationsGE它确实强大规则丰富报告漂亮。但很快发现三个致命问题第一通用校验器无法理解业务语义。GE 的expect_column_values_to_not_be_null可以检查非空但它不知道“用户手机号”字段在业务上允许为空注册时未填而“订单ID”字段绝对不能为空。它更不会知道“支付时间”字段的合法范围应该是“订单创建时间之后、发货时间之前”。这些规则必须由人来定义而 GE 的 DSL 虽然灵活却要求你为每张表、每个字段单独写 YAML当面对上百张核心表时维护成本指数级上升且极易遗漏。第二校验与执行环境割裂。GE 是 Python 主导而我们的迁移作业 95% 是用 Scala 写的 Spark 应用运行在 YARN 或 Kubernetes 上。引入 GE 意味着要额外部署 Python 环境、管理依赖冲突、处理序列化问题比如 Spark DataFrame 和 Pandas DataFrame 互转的性能损耗。一次校验要先 collect 到 driver 再交给 GE 处理对于亿级表光是 collect 就能把 driver 内存打爆。这不是优化是添堵。第三无法深度嵌入迁移流水线。我们希望校验不是迁移完成后的“补考”而是每个关键步骤后的“随堂测验”。比如原始数据读取后校验源端连接是否稳定、字段类型是否符合预期ETL 转换后校验关键指标聚合逻辑是否正确写入目标表前校验分区字段是否合规、数据倾斜是否可控。现有工具大多是“事后批处理”无法作为 Spark 作业的一个 stage 原生执行也无法共享同一个 SparkContext 的资源和上下文。提示不要迷信“开箱即用”。在大数据领域所谓“通用框架”往往在复杂业务场景下通用性越强定制性就越弱最终反而成为流程中的瓶颈点。2.2 我们的设计哲学轻量、声明式、Spark 原生、业务驱动基于以上教训我们决定自己构建一个极简但精准的校验框架。它的设计锚点非常明确轻量Lightweight不引入新语言、新运行时、新服务。全部代码用 Scala 编写编译成 jar直接作为依赖加入 Spark 作业工程。零额外部署零环境变更。一个校验规则就是一行 Spark DataFrame API 的封装没有魔法只有清晰的链式调用。声明式Declarative校验规则不是写死在代码里而是通过配置中心我们用 Apollo或外部 JSON 文件动态加载。一张表的校验规则长这样{ table_name: ods_order_detail, rules: [ {type: row_count_delta, threshold_percent: 0.01}, {type: pk_uniqueness, columns: [order_id, sku_id]}, {type: column_distribution, column: order_status, expected_ratio: {PAID: 0.85, SHIPPED: 0.12, CANCELLED: 0.03}}, {type: value_range, column: amount, min: 0.01, max: 999999.99} ] }工程师只需关注“要校验什么”不用操心“怎么实现”。规则变更无需发版重启作业即可生效。Spark 原生Spark-Native所有校验逻辑都基于 DataFrame/Dataset API 实现充分利用 Catalyst 优化器和 Tungsten 执行引擎。比如校验主键唯一性我们不 collect 后用 Map 去重而是用df.groupBy(pkCols).count().filter($count 1)让 Spark 在分布式环境下完成去重计数内存占用恒定速度极快。再比如分布校验我们用df.groupBy(col).count().withColumn(ratio, $count / lit(totalCount))全程不 shuffle 到 driver避免单点瓶颈。业务驱动Business-Driven框架本身不定义任何业务规则它只提供“校验能力容器”。真正的业务语义由数据产品经理、业务分析师和开发共同梳理沉淀为规则配置。例如“用户等级”字段的合法值必须是[LV1, LV2, LV3, LV4, LV5]这个列表来自 CRM 系统的枚举字典而不是开发拍脑袋写的。框架的价值在于把这份业务共识变成一条条可执行、可审计、可告警的机器指令。2.3 架构全景图四层结构各司其职整个框架不是单个类而是一个分层协作的体系共四层第一层规则定义层Rule Definition Layer这是业务输入的入口。支持三种方式定义规则JSON 配置文件用于离线批量校验、Apollo 配置中心用于在线动态调整、以及代码内联用于调试和特殊逻辑。所有规则最终被解析为统一的ValidationRulecase class包含ruleId、ruleType、params、severityINFO/WARN/ERROR等字段。这一层完全解耦可以轻松替换为其他配置源。第二层规则执行层Rule Execution Layer这是框架的“心脏”。它接收一个 DataFrame 和一组ValidationRule按类型分发给对应的RuleExecutor。每个 executor 是一个独立的 trait比如RowCountExecutor、UniquenessExecutor、DistributionExecutor。它们的共同契约是输入 DataFrame输出ValidationResult包含passed: Boolean、message: String、details: Map[String, Any]。关键设计是所有 executor 必须是lazy evaluation的——它们不立即触发计算而是返回一个ValidationResult的 lazy val真正触发计算是在最后汇总时才collect()或count()。这保证了即使配置了 20 条规则也只会产生 1~2 次物理执行计划避免重复扫描。第三层结果聚合层Result Aggregation Layer它将所有ValidationResult汇总生成结构化的校验报告。报告包含三个核心部分Summary总规则数、通过数、警告数、失败数、耗时Detail List每条规则的 ID、类型、状态、消息、详情如不重复主键的重复值样本Failure Snapshot仅当存在 ERROR 级别失败时自动采样 10 条违规记录保存为临时 Parquet 表供下游排查。报告格式支持 JSON供 API 调用、HTML供人工查看、以及写入 Hive 表供长期质量趋势分析。第四层集成接入层Integration Layer这是框架的“手脚”。它提供多种接入方式ValidationPipeline一个可复用的 Spark 作业模板接受输入表名、规则配置路径、输出报告路径开箱即用DataFrameValidator一个隐式类为 DataFrame 添加.validateWith(rules)方法一行代码完成校验ValidationListener一个 Spark 监听器可监听QueryExecutionEvent在每个 SQL 查询执行后自动触发预设规则实现“无感校验”。这四层之间通过纯函数式接口通信无状态、无副作用可以任意组合。比如你可以用DataFrameValidator在开发阶段快速验证单个转换逻辑也可以用ValidationPipeline在生产调度中作为独立作业运行还可以用ValidationListener全局开启让所有 SQL 查询都带上质量哨兵。3. 核心细节解析七类高频校验规则的原理与实操要点3.1 行数一致性校验Row Count Consistency这是迁移校验的“底线”。但“底线”不等于“简单”。很多人只做source.count() target.count()这在绝大多数场景下都是危险的。为什么危险空值陷阱Hive 表中count(*)统计所有行而count(col)只统计非空行。如果源表有大量 null 字段而目标表因 schema 演进默认填充了默认值如 0 或 行数可能一致但语义已变。分区裁剪失效Spark 读取分区表时若路径指定不精确如/data/year2023/month*可能漏读某些分区count()却显示正常。数据倾斜掩盖极端情况下99% 的数据集中在 1 个 partitioncount()正常但该 partition 因 OOM 而 silent fail实际数据丢失。我们的解决方案三重校验法我们不只比总数而是构建一个校验矩阵校验维度计算方式作用容忍阈值Total Countdf.count()基础行数±0.01%Non-Null PK Countdf.filter(col(pk).isNotNull).count()排除主键空值干扰±0.001%Partition-Level Countdf.groupBy(year, month).count() 比对源端分区统计验证分区完整性每个分区 ±0.005%实操中我们用一个RowConsistencyRule封装这三者。关键代码片段如下// 1. 获取总行数触发一次 action val totalCount df.count() // 2. 获取非空主键行数复用同一 physical plan避免二次扫描 val nonNullPkCount df.filter(df.col(pkColumn).isNotNull).count() // 3. 分区统计使用 cache 避免重复计算 val partitionStats df.groupBy(partitionCols: _*).count().cache() val partitionCount partitionStats.count() // 最终校验逻辑 val totalPass math.abs(totalCount - sourceTotal) / sourceTotal.toDouble threshold val nonNullPass math.abs(nonNullPkCount - sourceNonNullPk) / sourceNonNullPk.toDouble threshold * 0.1 val partitionPass partitionStats.join(sourcePartitionStats, partitionCols, full) .filter($count_left ! $count_right || $count_left.isNull || $count_right.isNull) .isEmpty注意partitionStats.cache()是关键。如果不 cache每次.count()都会重新扫描全表性能灾难。我们约定所有涉及多次计算的中间 DataFrame必须显式 cache 并在最后 uncache这是 Spark 性能调优的铁律。3.2 主键/业务键唯一性校验PK/BK Uniqueness迁移中最怕“静默去重”。源端因历史原因存在脏数据如两个完全相同的订单IDETL 过程中被 Spark 的dropDuplicates误删而没人察觉。我们的策略不追求“绝对唯一”而追求“业务可接受的唯一”我们区分两种键Technical PK数据库自增 ID、UUID必须 100% 唯一不容忍任何重复Business BK如user_idorder_date业务上允许同一天多个订单但不允许同一秒创建两个完全相同的订单需结合时间戳精度判断。校验逻辑分两步Step 1快速探查用df.groupBy(cols).count().filter($count 1)找出所有重复组。这一步不采样只看是否有重复。如果count()返回 0直接通过。Step 2深度分析若有重复再执行df.groupBy(cols).agg(collect_list(struct(*)).as(duplicates))将每组重复记录打包成一个 struct 数组。然后我们不直接报错而是计算重复组数量占总行数的比例每组重复记录的平均数量重复记录在关键业务字段如金额、状态上是否完全一致。只有当重复比例 0.0001% 且关键字段不一致时才标记为 ERROR。否则WARN 并生成报告供业务方决策是否清洗。实操心得永远不要在生产环境用df.dropDuplicates().count() df.count()来校验唯一性。前者会触发 full shuffle后者是 driver 端计算两者规模完全不在一个量级对比毫无意义。必须用 groupBy count 的分布式方式。3.3 字段分布一致性校验Column Distribution Consistency这是识别“数据漂移”的核心。比如源端user_gender字段中M占 52%F占 47%O占 1%而目标端变成了M55%F44%O1%。表面看都合理但 3% 的偏移可能意味着清洗逻辑错误如把部分F误判为M。我们的实现基于直方图的 KL 散度Kullback-Leibler Divergence我们不简单比比例而是计算两个分布的“距离”。KL 散度衡量的是用目标分布去近似源分布时信息损失有多大。值越小分布越接近。计算步骤对源表和目标表分别用df.groupBy(col).count()得到频次分布将频次归一化为概率分布 P源和 Q目标计算 KL(P||Q) Σ P(i) * log(P(i)/Q(i))其中 i 为所有取值设定阈值如 0.05超过则告警。关键优化稀疏值处理对出现频次 0.001% 的值合并为OTHER避免因噪声值导致 KL 散度虚高空值单独建模null不参与分布计算而是作为一个独立的 bin单独校验其占比是否一致动态 binning对数值型字段如年龄不按原始值分组而是先bucketize如 0-18, 19-35, 36-50, 50再计算分布避免因精度差异源端 int目标端 bigint导致 bin 数爆炸。实操中我们发现一个经典坑Hive 表的count(*)和 Spark 的count()在处理NULL时行为一致但count(col)在某些 Hive 版本中会把空字符串当作NULL计数而 Spark 不会。因此分布校验必须明确指定col.isNull || col 作为 null 判断条件保持两端语义严格对齐。3.4 数值范围与精度校验Value Range Precision迁移中字段类型变更如DECIMAL(10,2)→DOUBLE或序列化过程如 JSON 解析极易导致精度丢失。199.99变成199.98999999999998在金融场景下就是事故。我们的双保险机制保险一静态 Schema 校验在作业启动时不读数据只读元数据。用spark.catalog.getTable(source_db.table).schema和spark.catalog.getTable(target_db.table).schema对比每个字段的dataType。对数值型字段重点检查DecimalType的 precision/scale 是否一致DoubleType/FloatType是否被错误地用于货币字段应 WARNStringType是否被用于本应是TimestampType的字段如2023-01-01 12:00:00vs1672574400000。保险二动态值校验对数值型字段执行min()和max()检查是否超出业务合理范围如用户年龄min 0 or max 150approx_count_distinct()检查离散度如user_id的 distinct count 应接近总行数若远小于则可能被 hash 截断精度比对对DECIMAL字段用df.selectExpr(abs(amount - round(amount, 2)) as diff).filter(diff 0.001).count()找出所有精度超差的记录。一个真实案例某次迁移将BIGINT的订单ID转为STRING本意是兼容未来 UUID。但源端 ID 是1234567890123456789而目标端因 JavaLong.toString()在某些 Spark 版本中存在 bug被截断为1234567890123456700。静态校验发现类型变更动态校验的min/max无异常但approx_count_distinct显示 distinct count 比总行数少 12%最终定位到这个截断问题。3.5 空值率与空值模式校验Null Rate Null Pattern空值不是“没有数据”而是“一种特定的数据状态”。不同字段的空值语义天差地别user_email为空可能是未注册order_payment_time为空则意味着交易未完成是核心业务状态。我们的校验不是“空值越少越好”而是“空值模式是否符合预期”我们定义两类规则Null Rate Rule对每个字段配置一个期望的空值率区间[min, max]。例如shipping_address的期望空值率是[0.05, 0.15]5%-15% 的用户选择不填详细地址。实际值超出即告警。Null Pattern Rule校验空值是否“成组出现”。例如payment_time为空时payment_status必须为PENDING若出现payment_timenull AND payment_statusSUCCESS则为严重错误。这需要用df.filter(col(payment_time).isNull col(payment_status) ! PENDING)来探测。实操难点在于如何高效计算上百个字段的空值率暴力方案df.agg(Map(col1 - count, col2 - count, ...) )会生成巨量 shuffle。我们的方案是用df.select((sum(when(col(c).isNull, 1).otherwise(0)) / count(lit(1))).as(c _null_rate) for c - columns: _*)在一个 aggregation 中完成所有字段的 null rate 计算结果是一个单行 DataFrame再用collect()拿到 driver内存安全。注意count(lit(1))是计算总行数的最优方式比count(*)更明确且 Catalyst 优化器对其识别更好。3.6 业务逻辑一致性校验Business Logic Consistency这是校验框架的“灵魂”也是最难标准化的部分。它把业务规则翻译成 Spark 代码。典型场景与实现状态流转校验订单状态必须遵循CREATED → PAID → SHIPPED → DELIVERED禁止跳过PAID直接到SHIPPED。实现df.filter(col(status) SHIPPED !array_contains(array(CREATED, PAID), col(prev_status)))。这里prev_status是通过window函数从历史记录中获取的。金额平衡校验order_total sum(item_amount) shipping_fee - discount。实现先join订单主表和明细表再withColumn(calc_total, ...)最后filter($order_total ! $calc_total)。时间逻辑校验delivery_time shipped_time paid_time created_time。实现df.filter(!($delivery_time $shipped_time $shipped_time $paid_time $paid_time $created_time))。关键原则所有业务校验必须可逆、可解释、可修复。“可逆”校验失败的记录必须能通过where条件精准定位不能是模糊的exists子查询“可解释”失败消息必须包含具体哪条规则、哪个字段、什么值违反了什么条件如订单IDORD123456, delivery_time2023-01-01 10:00:00 shipped_time2023-01-01 11:00:00“可修复”校验逻辑本身不能修改数据它只是“照妖镜”。修复动作必须由独立的清洗作业完成。3.7 数据新鲜度与时效性校验Data Freshness Timeliness迁移不是静态快照而是持续同步。校验框架必须能回答“目标表里最新的数据是源端什么时候产生的延迟多久”我们的方案基于 watermark 的滑动窗口校验从源表中提取业务时间字段如event_time或log_time的最大值max_source_ts从目标表中提取同一字段的最大值max_target_ts计算延迟delay max_source_ts - max_target_ts校验delay expected_max_delay如 5 分钟。但这还不够。我们进一步做分区新鲜度检查目标表最新分区如dt2023-10-01是否已完整写入即该分区的count()是否达到源端对应分区的 99.9%增量延迟分布对过去 24 小时的每个同步批次计算其延迟并绘制直方图观察是否出现长尾延迟如 95% 的批次延迟 2 分钟但 5% 的批次延迟 30 分钟说明有偶发性瓶颈。实操中我们发现一个普遍误区用current_timestamp()去减max(event_time)是错误的。因为current_timestamp()是作业提交时间而数据写入是异步的。正确做法是用目标表自身的max(_commit_time)如果支持或max(hdfs_modification_time)作为“当前时间”确保时间基准一致。4. 实操过程从零搭建一个可运行的校验作业4.1 环境准备与依赖管理我们假设你的 Spark 环境是 3.3.x兼容 Scala 2.12构建工具是 sbt。框架本身不依赖任何第三方校验库只依赖 Spark Core 和 Spark SQL。sbt 依赖配置build.sbtname : spark-data-validation version : 1.0 scalaVersion : 2.12.15 libraryDependencies Seq( org.apache.spark %% spark-sql % 3.3.2 % provided, // provided 是关键避免与集群版本冲突 com.typesafe % config % 1.4.2, // 用于读取外部配置 org.slf4j % slf4j-api % 1.7.36, // 日志接口 ch.qos.logback % logback-classic % 1.4.5 % runtime // 日志实现 ) // 关键禁用 scala-library 的传递依赖避免版本冲突 dependencyOverrides Set( org.scala-lang % scala-library % 2.12.15, org.scala-lang % scala-reflect % 2.12.15 )为什么强调providedSpark 集群YARN/K8s已经提供了spark-sql的 jar 包。如果你把spark-sql打进自己的 fat jar会导致类加载冲突ClassCastException这是生产环境最常见的“作业跑不通”原因之一。provided告诉 sbt编译时需要这个包但打包时不要包含。配置文件application.confvalidation { # 默认严重级别ERROR 会中断作业WARN 只记录 default-severity ERROR # 报告输出路径支持 hdfs://, s3a://, file:// report-output-path hdfs://nameservice1/data/validation/reports # 临时快照路径用于保存失败样本 snapshot-output-path hdfs://nameservice1/data/validation/snapshots # 是否启用缓存优化 enable-cache true } # 规则配置可外置 rules-config-path hdfs://nameservice1/config/validation-rules.json4.2 核心类定义ValidationRule 与 ValidationResult这是框架的基石必须设计得足够灵活。// 规则定义 case class ValidationRule( ruleId: String, ruleType: String, // 如 row_count_delta, pk_uniqueness params: Map[String, Any], // 动态参数 severity: String ERROR, // INFO, WARN, ERROR description: String ) // 校验结果 case class ValidationResult( ruleId: String, passed: Boolean, message: String, details: Map[String, Any] Map.empty, timestamp: Long System.currentTimeMillis(), durationMs: Long 0L ) // 批量结果 case class ValidationReport( tableName: String, summary: Map[String, Any], results: Seq[ValidationResult], failureSnapshotPath: Option[String] None )设计要点params: Map[String, Any]是为了支持任意类型的参数避免为每种规则定义一个子类。虽然牺牲了一点类型安全但换来极致的扩展性。后续可通过ValidationRuleParser对常用参数做类型转换如将params(threshold_percent)转为DoubledurationMs记录每条规则的执行耗时用于性能分析。我们发现distribution规则通常比row_count慢 5-10 倍因为它需要groupBy这是预期之内的failureSnapshotPath是一个Option只有在存在 ERROR 时才生成避免无谓的 IO 开销。4.3 规则执行器RuleExecutor的实现范式所有RuleExecutor都继承自一个 traittrait RuleExecutor { def execute(df: DataFrame, rule: ValidationRule)(implicit spark: SparkSession): ValidationResult } // 示例行数校验执行器 class RowCountExecutor extends RuleExecutor { override def execute(df: DataFrame, rule: ValidationRule)(implicit spark: SparkSession): ValidationResult { val startTime System.currentTimeMillis() val sourceCount rule.params.get(source_count).map(_.toString.toLong).getOrElse(0L) val thresholdPercent rule.params.get(threshold_percent).map(_.toString.toDouble).getOrElse(0.01) try { val targetCount df.count() val delta math.abs(targetCount - sourceCount) val deltaPercent if (sourceCount 0) delta.toDouble / sourceCount else 0.0 val passed deltaPercent thresholdPercent val message if (passed) { sRow count passed. Source: $sourceCount, Target: $targetCount, Delta: ${deltaPercent * 100}% } else { sRow count failed. Source: $sourceCount, Target: $targetCount, Delta: ${deltaPercent * 100}% ${thresholdPercent * 100}% } ValidationResult( ruleId rule.ruleId, passed passed, message message, details Map(source_count - sourceCount, target_count - targetCount, delta_percent - deltaPercent), durationMs System.currentTimeMillis() - startTime ) } catch { case e: Exception ValidationResult( ruleId rule.ruleId, passed false, message sRow count execution failed: ${e.getMessage}, details Map(error - e.toString), durationMs System.currentTimeMillis() - startTime ) } } }关键技巧异常捕获必须精细不能catch Exception后吞掉必须记录e.toString否则线上排查时一片空白所有耗时计算必须包裹在try/catch内因为df.count()可能因数据倾斜、OOM 而抛出SparkExceptiondetails字段是调试金矿它会被序列化进报告运维同学可以直接看到source_count和target_count的原始值无需再查日志。4.4 构建一个完整的校验作业ValidationPipeline这是最终交付给数据工程师使用的“可执行产品”。object ValidationPipeline extends App { // 1. 初始化 SparkSession val spark SparkSession.builder() .appName(Data Validation Pipeline) .config(spark.sql.adaptive.enabled, true) // 启用 AQE对校验有益 .getOrCreate() import spark.implicits._ // 2. 解析命令行参数 val args new mutable.ArrayBuffer[String]() args Array( --source-table, hive_prod.ods.order_detail, --target-table, iceberg_prod.dwd.order_detail, --rules-config, hdfs://.../rules.json, --report-path, hdfs://.../reports/ ) val parser new scopt.OptionParser[Config](validation-pipeline) { head(Data Validation Pipeline, 1.0) opt[String](s, source-table).required().action((x, c) c.copy(sourceTable x)) opt[String](t, target-table).required().action((x, c) c.copy(targetTable x)) opt[String](r, rules-config).required().action((x, c) c.copy(rulesConfig x)) opt[String](o, report-path).required().action((x, c) c.copy(reportPath x)) } val config parser.parse(args, Config()).get // 3. 加载规则 val rules ValidationRuleLoader.loadFromHdfs(config.rulesConfig, spark) // 4. 读取源表和目标表 val sourceDf spark.read.table(config.sourceTable) val targetDf spark.read.table(config.targetTable) // 5. 执行校验 val validator new DataFrameValidator(spark) val report validator.validate(sourceDf, targetDf, rules, config.sourceTable) // 6. 输出报告 ValidationReportWriter.write(report, config.reportPath, spark) //