
上一篇【第88篇】日志收集平台的Kafka实战——百亿日志的接入、传输与清洗下一篇【第90篇】Kafka在微服务中的最佳实践——事件驱动架构设计全攻略摘要数据库里一条记录变了怎么让ES、Redis、数据仓库同步感知传统的定时全量同步太重了延迟动辄几分钟到几小时。而CDCChange Data Capture变更数据捕获能把数据库的每一次增删改变成一条实时消息通过Kafka广播给所有下游——延迟降到秒级还不影响源库性能。本文深入MySQL CDC与Kafka的集成实战从Binlog原理讲起到Debezium的完整配置与调优再到Binlog事件→Kafka消息的映射机制下游消费者的幂等处理方案大表百万级数据的全量初始化策略以及让人头疼的DDL变更处理。每个环节都有可落地的配置和代码。一、CDC原理——从Binlog到Kafka的魔法1.1 什么是CDC【传统数据同步 vs CDC实时同步】 传统方案定时全量同步: ┌────────┐ 每5分钟一次 ┌────────┐ │ MySQL │───────────────►│ ES │ │ 1000万行│ 全表扫描 │ 1000万行│ └────────┘ CPU飙升 └────────┘ 问题: 延迟大、源库压力大、无法感知DELETE CDC方案实时增量同步: ┌────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │ MySQL │ │ Binlog流 │ │ Kafka │ │ ES │ │ INSERT │──► 实时读取 │──► 消息队列 │──► 增量更新 │ │ UPDATE │ │ Binlog │ │ │ │ Redis │ │ DELETE │ │ │ │ │ │ Hive │ └────────┘ └──────────┘ └──────────┘ └────────┘ 延迟: 1秒 源库压力: 几乎为零 支持DELETECDC的核心思想很简单不去扫表而是监听数据库的变更日志。1.2 MySQL Binlog的三种格式格式记录方式优点缺点CDC适用STATEMENT记录SQL语句日志量小不确定函数NOW()等导致不一致❌ROW记录每行数据变更精确、可重放日志量大✅ 必须用ROWMIXED混合默认STATEMENT折中CDC可能丢数据❌必须将MySQL的binlog_format设置为ROW这是Debezium工作的前提。-- 检查当前binlog格式SHOWVARIABLESLIKEbinlog_format;-- 设置为ROW需要重启或动态修改SETGLOBALbinlog_formatROW;-- 同时确保binlog是开启的SHOWVARIABLESLIKElog_bin;1.3 Binlog事件类型【Binlog事件 → Kafka消息映射】 MySQL Binlog事件 Kafka消息 ────────────────────────────────────────────────── INSERT INTO users VALUES (...) → Create事件 {op: c, after: {...}} UPDATE users SET nameBob → Update事件 {op: u, before:{...}, after:{...}} DELETE FROM users WHERE id1 → Delete事件 {op: d, before: {...}} ALTER TABLE users ADD COLUMN ... → schema变更事件需要特殊处理二、Debezium配置与使用2.1 Debezium Connector架构【Debezium Kafka Connect 架构】 ┌─────────────────────────────────────────────────────┐ │ Kafka Connect Worker │ │ │ │ ┌───────────────────────────────────────────────┐ │ │ │ Debezium MySQL Connector │ │ │ │ │ │ │ │ ┌──────────────┐ ┌──────────────────┐ │ │ │ │ │ Snapshot线程 │ │ Binlog读取线程 │ │ │ │ │ │ (全量初始化) │ │ (增量同步) │ │ │ │ │ └──────┬───────┘ └────────┬─────────┘ │ │ │ │ │ │ │ │ │ │ └───────────┬───────────┘ │ │ │ │ │ │ │ │ │ ┌──────▼──────┐ │ │ │ │ │ Kafka Topic │ │ │ │ │ │ (每条表的 │ │ │ │ │ │ 变更事件) │ │ │ │ │ └─────────────┘ │ │ │ └───────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ Snapshot: 首次启动时全量导出表数据到Kafka Binlog读取: 实时监听Binlog增量推送变更2.2 完整配置{name:mysql-orders-connector,config:{connector.class:io.debezium.connector.mysql.MySqlConnector,// MySQL连接配置 database.hostname:mysql-master,database.port:3306,database.user:debezium,database.password:${DBZ_MYSQL_PASSWORD},database.server.id:184054,database.server.name:prod-mysql,// 要监控的库和表 database.include.list:ecommerce,table.include.list:ecommerce.orders,ecommerce.order_items,ecommerce.payments,// Kafka连接配置 database.history.kafka.bootstrap.servers:broker1:9092,broker2:9092,database.history.kafka.topic:dbhistory.prod-mysql,// 全量初始化配置 snapshot.mode:initial,snapshot.locking.mode:minimal,snapshot.fetch.size:10000,// 数据类型转换 decimal.handling.mode:double,time.precision.mode:connect,// 高级配置 max.queue.size:8192,max.batch.size:2048,poll.interval.ms:500,// Topic命名策略 topic.prefix:prod-mysql,// DDL处理 include.schema.changes:true,schema.history.internal.kafka.bootstrap.servers:broker1:9092,schema.history.internal.kafka.topic:schema-history.prod-mysql}}// 部署命令curl-XPOST-HContent-Type: application/json\--data mysql-connector.json \http://connect-host:8083/connectors2.3 MySQL端权限配置-- 创建Debezium专用用户CREATEUSERdebezium%IDENTIFIEDBYstrong-password;-- 授予必要权限GRANTSELECT,RELOAD,SHOWDATABASES,REPLICATIONSLAVE,REPLICATIONCLIENTON*.*TOdebezium%;FLUSHPRIVILEGES;三、Binlog事件到Kafka消息的映射3.1 Kafka消息结构Debezium发送到Kafka的消息结构{schema:{...},payload:{before:null,// UPDATE/DELETE时有值after:{// INSERT/UPDATE时有值id:1001,user_id:U10086,amount:299.00,status:CREATED,created_at:2026-05-30T10:30:00Z},source:{// 元数据version:2.5.0.Final,connector:mysql,name:prod-mysql,ts_ms:1717059600000,snapshot:false,// 是否来自快照db:ecommerce,table:orders,server_id:1,gtid:xxx,file:mysql-bin.000003,pos:12345,row:0},op:c,// ccreate, uupdate, ddelete, rread(快照)ts_ms:1717059600000}}3.2 下游Topic命名规则Debezium自动生成的Topic规则: {topic.prefix}.{database}.{table} 示例: prod-mysql.ecommerce.orders → orders表的变更 prod-mysql.ecommerce.order_items → order_items表的变更 prod-mysql.ecommerce.payments → payments表的变更四、下游消费者幂等处理4.1 CDC消息的特性CDC消息有一个重要特性At-Least-Once语义。即同一条数据库变更可能被发送多次因为Connector故障恢复、Rebalance等。消费者必须做幂等处理。4.2 基于操作类型的幂等写入ESComponentpublicclassOrderCDCToES{privatefinalRestHighLevelClientesClient;KafkaListener(topicsprod-mysql.ecommerce.orders,groupIdes-sync-service)publicvoidhandleOrderChange(StringjsonMessage){CDCEventeventparseCDCEvent(jsonMessage);StringorderIdevent.getAfter().get(id).toString();StringesIdorder_orderId;switch(event.getOp()){casec,r-{// CREATE 或 快照READ// UPSERT 写入ES天然幂等IndexRequestrequestnewIndexRequest(orders).id(esId).source(event.getAfter(),XContentType.JSON);esClient.index(request,RequestOptions.DEFAULT);}caseu-{// UPDATE// 部分更新幂等UpdateRequestrequestnewUpdateRequest(orders,esId).doc(event.getAfter(),XContentType.JSON).upsert(event.getAfter());// 如果不存在就插入esClient.update(request,RequestOptions.DEFAULT);}cased-{// DELETE// 删除幂等删多次不会报错DeleteRequestrequestnewDeleteRequest(orders,esId);esClient.delete(request,RequestOptions.DEFAULT);}}}}4.3 写入Redis的幂等处理KafkaListener(topicsprod-mysql.ecommerce.orders,groupIdredis-cache-service)publicvoidhandleOrderChangeForRedis(StringjsonMessage){CDCEventeventparseCDCEvent(jsonMessage);StringorderIdevent.getAfter()!null?event.getAfter().get(id).toString():event.getBefore().get(id).toString();StringredisKeyorder:orderId;switch(event.getOp()){casec,r,u-{// SET操作天然幂等redisTemplate.opsForValue().set(redisKey,JSON.toJSONString(event.getAfter()),Duration.ofHours(24));}cased-{// DEL操作天然幂等redisTemplate.delete(redisKey);}}}4.4 利用source信息做精确去重对于需要严格去重的场景可以用Debezium提供的binlog位置信息publicclassCDCDeduplicator{privatefinalRedisTemplateString,Stringredis;publicbooleanisDuplicate(CDCEventevent){JsonNodesourceevent.getSource();StringdedupKeyString.format(%s:%s:%d,source.get(file).asText(),// binlog文件名source.get(pos).asLong(),// binlog位置source.get(row).asInt()// 行号);// SetNX: 返回true说明是新事件false说明已处理过return!redis.opsForValue().setIfAbsent(cdc_dedup:dedupKey,1,Duration.ofDays(7));}}五、大表全量初始化方案5.1 问题场景订单表有5000万行数据如果一次全量快照需要几个小时还可能在快照期间锁表。5.2 Debezium的快照策略snapshot.mode行为适用场景initial首次启动做全量快照之后增量新部署默认when_needed只在需要时做快照恢复数据后never永远不做快照从指定binlog位点开始schema_only只同步表结构数据已存在只同步增量initial_only只做快照然后停一次性数据导出5.3 大表全量导出的最佳实践【大表全量初始化三步骤】 Step 1: 分批快照 (Snapshot) ┌─────────────────────────────────────────┐ │ snapshot.fetch.size 20000 │ │ snapshot.locking.mode minimal │ │ │ │ 5000万行 / 20000 2500批 │ │ 每批约2秒 → 总耗时约1.5小时 │ │ │ │ 关键技术: minimal锁只在获取全局读锁的 │ │ 一瞬间锁表拿到binlog位点后立即释放 │ └─────────────────────────────────────────┘ Step 2: 增量追赶 (Catch-up) ┌─────────────────────────────────────────┐ │ 快照期间发生的变更都存在Binlog里了 │ │ 快照完成后自动从记录的位点开始回放 │ │ │ │ 最终实现: 快照数据 增量变更 完整数据 │ └─────────────────────────────────────────┘ Step 3: 并行消费加速 ┌─────────────────────────────────────────┐ │ Topic: prod-mysql.ecommerce.orders │ │ 分区数: 12 │ │ │ │ Consumer Group: es-sync-service │ │ 消费者数: 12 (每个消费一个分区) │ │ │ │ 注意: 快照阶段消息没有Key, 会轮询到 │ │ 所有分区; 增量阶段按表主键Hash分区 │ └─────────────────────────────────────────┘5.4 快照性能调优{// 每批拉取的行数增大可以减少查询次数snapshot.fetch.size:20000,// 锁策略minimal最轻量不阻塞业务写入snapshot.locking.mode:minimal,// 使用SELECT ... WHERE 分批查询避免大结果集内存溢出snapshot.select.statement.overrides:ecommerce.orders,snapshot.select.statement.overrides.ecommerce.orders:SELECT * FROM ecommerce.orders WHERE id ? ORDER BY id ASC,// 限制快照的并发查询线程snapshot.max.threads:4}六、DDL变更的优雅处理6.1 DDL变更的问题表结构变了比如加了个字段下游消费者怎么办【DDL变更场景】 ALTER TABLE orders ADD COLUMN coupon_id BIGINT; 问题: 1. 旧消息没有coupon_id字段解析会出错 2. 新消息有coupon_id字段旧消费者schema不兼容 3. 消费者代码需要更新但更新需要时间6.2 Debezium的Schema EvolutionDebezium通过Schema History Topic自动管理表结构的变更历史【Schema变更的处理流程】 ┌──────────────┐ 执行DDL: │ MySQL │ ALTER TABLE │ 执行DDL │ └──────┬───────┘ │ DDL写入Binlog ▼ ┌──────────────┐ │ Debezium │ │ 读取DDL事件 │ └──────┬───────┘ │ ┌─────────────┼─────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────┐ ┌──────────┐ ┌───────────┐ │ Schema │ │ 新Schema │ │ 数据Topic │ │ History │ │ 嵌入消息 │ │ 新消息含 │ │ Topic │ │ (可选) │ │ coupin_id │ └─────────────┘ └──────────┘ └───────────┘6.3 消费者的Schema兼容处理KafkaListener(topicsprod-mysql.ecommerce.orders)publicvoidhandleOrderChange(StringjsonMessage){JsonNodepayloadobjectMapper.readTree(jsonMessage).get(payload);if(payloadnull)return;// 获取after节点可能为nullDELETE的情况JsonNodeafterpayload.get(after);if(after!null){MapString,ObjectorderMapnewHashMap();// 用迭代器遍历所有字段自动适配DDL变更IteratorStringfieldNamesafter.fieldNames();while(fieldNames.hasNext()){StringfieldfieldNames.next();orderMap.put(field,after.get(field).asText());}// 安全获取新旧字段兼容DDL变更StringorderIdorderMap.containsKey(id)?orderMap.get(id).toString():null;StringcouponIdorderMap.getOrDefault(coupon_id,null);// 写入ES动态字段映射自动适配IndexRequestrequestnewIndexRequest(orders).id(order_orderId).source(orderMap);esClient.index(request,RequestOptions.DEFAULT);}}6.4 DDL变更最佳实践场景处理方式加字段消费者用Map/Object接收消息不过度依赖固定Pojo改字段类型先在消费者端兼容新旧类型再执行DDL删字段消费者代码先去掉对该字段的引用再执行DROP改表名在Debezium中配table.rename.format重映射到新Topic加表更新Debezium的table.include.list重启Connector七、常见坑与解决方案问题症状原因解决方案快照锁表业务写入超时snapshot.locking.mode错误使用minimal模式Binlog丢失Debezium报错找不到binlogbinlog过期被清理增加expire_logs_days或增大Kafka保留时间大事务卡住消费Lag突然暴涨一个大事务产生大量binlog拆分大事务设置max.queue.sizeGTID不一致从库切换后数据重复GTID没有启用启用GTID并使用gtid.new.channel.positionOOMDebezium内存溢出快照fetch.size太大减小fetch.size调大堆内存本篇小结MySQL CDC Kafka Debezium 是实时数据同步的王炸方案CDC核心原理把数据库BinlogROW格式变成Kafka消息流变被动轮询为主动推送Debezium配置关键binlog_formatROW是前提snapshot.modeinitial首次全量snapshot.locking.modeminimal减少锁表时间消息映射INSERT→Createafter有值、UPDATE→Updatebeforeafter、DELETE→Deletebefore有值幂等处理ES用upsert、Redis用SET/DEL天然幂等敏感场景用binlog位置做精确去重大表初始化分批快照 增量追赶 并行消费加速DDL变更消费者用动态字段遍历而非固定PojoSchema History Topic自动管理结构变更一句话总结让数据库的每一次呼吸都变成一条Kafka消息。上一篇【第88篇】日志收集平台的Kafka实战——百亿日志的接入、传输与清洗下一篇【第90篇】Kafka在微服务中的最佳实践——事件驱动架构设计全攻略