【架构实战】数据一致性方案:Binlog同步 vs 消息队列

张开发
2026/4/5 11:02:52 15 分钟阅读

分享文章

【架构实战】数据一致性方案:Binlog同步 vs 消息队列
一、数据一致性的挑战在分布式系统中数据往往存储在多个地方MySQL主数据 ├── Redis缓存 ├── Elasticsearch搜索 ├── ClickHouse分析 └── 其他服务的数据库核心问题当MySQL数据变更时如何保证其他存储的数据也同步更新常见方案双写应用层同时写多个存储Binlog同步监听MySQL Binlog异步同步消息队列写完MySQL后发消息消费者同步二、双写方案1. 实现方式ServiceTransactionalpublicclassProductService{AutowiredprivateProductMapperproductMapper;AutowiredprivateRedisTemplateredisTemplate;AutowiredprivateElasticsearchClientesClient;publicvoidupdateProduct(Productproduct){// 1. 更新MySQLproductMapper.updateById(product);// 2. 更新RedisredisTemplate.opsForValue().set(product:product.getId(),product);// 3. 更新ESesClient.index(i-i.index(products).id(product.getId().toString()).document(product));}}2. 双写的问题问题1原子性 MySQL更新成功Redis更新失败 → 数据不一致 问题2性能 同步写多个存储响应时间增加 问题3耦合 业务代码与存储细节耦合三、Binlog同步方案Canal1. Canal原理MySQL主库 │ │ BinlogROW格式 ▼ Canal Server伪装成MySQL从库 │ │ 解析Binlog事件 ▼ Canal Client │ ├──► Redis ├──► Elasticsearch └──► 其他存储2. Canal部署Docker方式version:3services:canal-server:image:canal/canal-server:v1.1.6container_name:canal-serverports:-11111:11111environment:canal.instance.master.address:mysql:3306canal.instance.dbUsername:canalcanal.instance.dbPassword:canalcanal.instance.filter.regex:order_db\..*volumes:-./canal-logs:/home/admin/canal-server/logsMySQL配置-- 创建Canal用户CREATEUSERcanal%IDENTIFIEDBYcanal;GRANTSELECT,REPLICATIONSLAVE,REPLICATIONCLIENTON*.*TOcanal%;FLUSHPRIVILEGES;3. Canal Client实现依赖dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.6/version/dependency消费BinlogComponentSlf4jpublicclassCanalClient{AutowiredprivateRedisTemplateredisTemplate;AutowiredprivateElasticsearchClientesClient;privateCanalConnectorconnector;PostConstructpublicvoidstart(){connectorCanalConnectors.newSingleConnector(newInetSocketAddress(canal-server,11111),example,,);connector.connect();connector.subscribe(order_db\..*);connector.rollback();// 启动消费线程ThreadthreadnewThread(this::consume);thread.setDaemon(true);thread.start();}privatevoidconsume(){while(true){try{Messagemessageconnector.getWithoutAck(100);longbatchIdmessage.getId();if(batchId-1||message.getEntries().isEmpty()){Thread.sleep(1000);continue;}processEntries(message.getEntries());connector.ack(batchId);}catch(Exceptione){log.error(Canal消费异常,e);connector.rollback();}}}privatevoidprocessEntries(ListEntryentries){for(Entryentry:entries){if(entry.getEntryType()!EntryType.ROWDATA){continue;}RowChangerowChange;try{rowChangeRowChange.parseFrom(entry.getStoreValue());}catch(Exceptione){log.error(解析Binlog失败,e);continue;}StringtableNameentry.getHeader().getTableName();EventTypeeventTyperowChange.getEventType();for(RowDatarowData:rowChange.getRowDatasList()){if(eventTypeEventType.INSERT||eventTypeEventType.UPDATE){handleUpsert(tableName,rowData.getAfterColumnsList());}elseif(eventTypeEventType.DELETE){handleDelete(tableName,rowData.getBeforeColumnsList());}}}}privatevoidhandleUpsert(StringtableName,ListColumncolumns){MapString,Stringdatacolumns.stream().collect(Collectors.toMap(Column::getName,Column::getValue));if(product.equals(tableName)){LongidLong.parseLong(data.get(id));// 更新RedisredisTemplate.opsForValue().set(product:id,data,30,TimeUnit.MINUTES);// 更新ESesClient.index(i-i.index(products).id(id.toString()).document(data));log.info(同步商品数据: {},id);}}privatevoidhandleDelete(StringtableName,ListColumncolumns){MapString,Stringdatacolumns.stream().collect(Collectors.toMap(Column::getName,Column::getValue));if(product.equals(tableName)){LongidLong.parseLong(data.get(id));// 删除RedisredisTemplate.delete(product:id);// 删除ESesClient.delete(d-d.index(products).id(id.toString()));log.info(删除商品数据: {},id);}}}4. Canal MQ模式ComponentpublicclassCanalToMqBridge{AutowiredprivateKafkaTemplatekafkaTemplate;privatevoidprocessEntries(ListEntryentries){for(Entryentry:entries){// 将Binlog事件转换为消息BinlogEventeventconvertToEvent(entry);// 发送到KafkakafkaTemplate.send(binlog-events,event);}}}KafkaListener(topicsbinlog-events)publicclassBinlogEventConsumer{publicvoidconsume(BinlogEventevent){// 根据表名和操作类型处理switch(event.getTableName()){caseproduct:syncProductToRedis(event);syncProductToEs(event);break;caseorder:syncOrderToEs(event);break;}}}四、消息队列方案1. 实现方式ServiceTransactionalpublicclassProductService{AutowiredprivateProductMapperproductMapper;AutowiredprivateKafkaTemplatekafkaTemplate;publicvoidupdateProduct(Productproduct){// 1. 更新MySQLproductMapper.updateById(product);// 2. 发送消息事务提交后发送TransactionSynchronizationManager.registerSynchronization(newTransactionSynchronizationAdapter(){OverridepublicvoidafterCommit(){kafkaTemplate.send(product:update,product);}});}}KafkaListener(topicsproduct:update)publicclassProductSyncConsumer{AutowiredprivateRedisTemplateredisTemplate;AutowiredprivateElasticsearchClientesClient;publicvoidconsume(Productproduct){// 同步到RedisredisTemplate.opsForValue().set(product:product.getId(),product);// 同步到ESesClient.index(i-i.index(products).id(product.getId().toString()).document(product));}}2. 事务消息RocketMQServicepublicclassProductService{AutowiredprivateRocketMQTemplaterocketMQTemplate;publicvoidupdateProduct(Productproduct){// 发送事务消息rocketMQTemplate.sendMessageInTransaction(product-update-topic,MessageBuilder.withPayload(product).build(),product);}}RocketMQTransactionListenerpublicclassProductTransactionListenerimplementsRocketMQLocalTransactionListener{AutowiredprivateProductMapperproductMapper;OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){try{Productproduct(Product)arg;productMapper.updateById(product);returnRocketMQLocalTransactionState.COMMIT;}catch(Exceptione){returnRocketMQLocalTransactionState.ROLLBACK;}}OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){// 检查本地事务状态Productproduct(Product)msg.getPayload();ProductdbProductproductMapper.selectById(product.getId());if(dbProduct!nulldbProduct.getVersion().equals(product.getVersion())){returnRocketMQLocalTransactionState.COMMIT;}returnRocketMQLocalTransactionState.ROLLBACK;}}五、方案对比维度双写Binlog同步消息队列实时性强准实时准实时一致性弱可能失败强中需幂等耦合度高低中复杂度低高中性能影响大小小适用场景简单场景数据同步业务解耦六、总结数据一致性是分布式系统的核心挑战双写简单但可靠性差Binlog同步可靠但复杂适合数据同步消息队列解耦但需要幂等处理推荐方案缓存同步Binlog Canal搜索同步Binlog Canal ES业务解耦消息队列 幂等消费思考题你们系统如何保证多存储之间的数据一致性

更多文章