mqtt-plus 架构解析(六):多 Broker 管理,如何让一个应用同时连接多个 MQTT 服务

张开发
2026/4/13 4:13:17 15 分钟阅读

分享文章

mqtt-plus 架构解析(六):多 Broker 管理,如何让一个应用同时连接多个 MQTT 服务
mqtt-plus 架构解析六多 Broker 管理如何让一个应用同时连接多个 MQTT 服务摘要很多 MQTT 项目真正进入生产后都会碰到一个问题应用往往不是只连一个 broker而是同时面对云端、本地边缘、测试环境甚至不同协议接入层。mqtt-plus当前的做法不是把多 broker 做成多套系统拼接而是让每个 broker 拥有各自独立的 adapter 和连接生命周期同时让路由、监听注册、消息发布接口继续共享同一套核心模型。本文会结合MqttBrokerDefinition、MqttClientAdapterFactoryRegistry、MqttBrokerAutoConfiguration、MqttClientAdapterRegistry和DefaultMqttTemplate拆解这套结构为什么成立。项目地址项目地址https://github.com/mqttplus/mqtt-plus配套的示例工程https://github.com/mqttplus/mqtt-plus-examples如果你对这个方向感兴趣欢迎关注、试用也欢迎一起交流 issue 和 PR。如果这篇文章对你有帮助欢迎点赞、收藏也欢迎给项目一个 Star。到了第 6 篇系列要回答的问题已经不再是“单条消息怎么走”而是更偏系统结构的一层如果应用里配置了多个 broker会不会变成多套逻辑混在一起如果每个 broker 都独立那为什么又能共享同一个MqttTemplate、同一个 router、同一批 listener 定义多 broker 到底是“多个系统拼接”还是“一套核心 多个连接实例”mqtt-plus 当前给出的答案非常明确broker 隔离发生在 adapter 和连接生命周期层共享发生在 core 里的路由、注册、发布抽象和 Spring 装配层。一、这篇文章到底想回答什么这一篇只回答三个问题broker 的独立性到底落在什么对象上多个 broker 为什么还能共享同一套核心能力starter 是如何把 YAML 里的多个 broker 配置转成多个真实 adapter 实例的如果只记住一句话那就是mqtt-plus 的多 broker 设计不是复制多套框架实例而是让“每个 broker 一条连接实例链路”同时让上层路由、监听和发布模型保持统一。二、先看多 broker 的整体结构先不要急着看配置和工厂先看整体关系。Broker AAdapter ABroker BAdapter BShared RouterShared Listener RegistryShared Listener InvokerDefaultMqttTemplateMqttClientAdapterRegistry这张图里最关键的不是“有两个 broker”而是这两个事实同时成立每个 broker 对应自己的 adapter 实例adapter 的上层能力并没有跟着复制两份而是共享router、registry和template换句话说mqtt-plus 的多 broker 不是“多套框架并存”而是连接层按 broker 隔离核心模型按框架共享这也是为什么它在结构上比较轻而不是不断复制配置和 Bean。三、broker 的独立性首先落在MqttBrokerDefinition每个 broker 在 core 里都有一个明确的定义对象MqttBrokerDefinition。它至少承载了这些信息brokerIdhost/portclientIdusername/passwordcleanSessionsslEnabledkeepAliveIntervalconnectionTimeoutinboundThreadPool这一层非常重要因为它说明 mqtt-plus 不是简单拿一个字符串标记 broker而是把 broker 看成一组完整的连接定义。这也意味着 broker 的独立性不是停留在“名字不一样”而是连接参数可以不同会话策略可以不同入站线程池也可以不同特别是inboundThreadPool这一点它说明“多 broker”不只是“多地址”还包括每个 broker 在消息接入吞吐上的独立运行参数。设计决策mqtt-plus 没有把 broker 仅仅建模成一个brokerId - host:port的轻量映射而是使用MqttBrokerDefinition承载完整连接定义。这样做的重点是让“多 broker”真正拥有独立运行属性而不是只有一个逻辑标签。四、FactoryRegistry 为什么是多 broker 的关键枢纽有了MqttBrokerDefinition之后下一个问题就是这个 broker 应该用哪个 adapter 去创建这一层由 starter 里的MqttClientAdapterFactoryRegistry负责。它做的事情很集中收集所有MqttClientAdapterFactory按adapterId建立映射支持显式按 adapter 选择在没有显式指定时按 mqtt 版本和 classpath 条件自动挑选兼容实现当前真实实现里它有一个特别值得注意的策略如果没有显式指定 adapter且 classpath 中存在spring-integration且版本兼容那它会优先选择spring-integration也就是说多 broker 不是“一股脑全走同一个硬编码实现”而是每个 broker 在创建时都经过一次工厂解析。YesNoYesNoBroker Propertiesexplicit adapter?getRequiredByAdapterId(adapterId)spring-integration available and compatible?choose spring-integration factoryscan factories by mqttVersioncreate adapter instance这张图说明了一件很重要的事broker 独立不代表配置层要重复写很多分支逻辑adapter 选择逻辑被统一收敛进了 factory registry。这也是第 6 篇最核心的设计点之一。五、多个 adapter 是怎么注册起来的把配置真正变成多个运行中的 adapter发生在MqttBrokerAutoConfiguration.registerAdapters(...)里。它的主路径非常清楚遍历mqtt-plus.brokers取出每个 broker 的BrokerProperties调用toDefinition(brokerId)构造MqttBrokerDefinition通过MqttClientAdapterFactoryRegistry.resolveFactory(...)选出 factory调用factory.create(definition, inboundMessageSink)创建 adapter把所有MqttConnectionListener挂到 adapter 上注册到MqttClientAdapterRegistry调用adapter.connect()建立连接这里最值得注意的是第 7 和第 8 步之间的关系。starter 不是“连上以后再注册”而是先注册 adapter再 connect如果 connect 失败再从 registry 里移除对应测试也验证了这点连接成功时adapter 已注册且可查找连接失败时adapter 会从 registry 移除不留下脏状态这意味着多 broker 管理不仅有创建逻辑也有最基本的一致性清理逻辑。设计决策MqttBrokerAutoConfiguration先注册 adapter、再 connect、失败后回滚 registry。这种顺序让 starter 层既保留了统一注册入口又避免在连接失败时留下一个“可查询但不可用”的 broker 适配器。六、为什么说 broker 隔离和核心共享是同时成立的这一点如果只看代码很容易觉得“既然有多个 adapter那是不是路由也要有多份”答案是否定的。核心共享主要体现在三处1. 入站消息仍然走同一个 router每个 adapter 创建时都会拿到同一个MqttInboundMessageSink。也就是说虽然 adapter 有多个但它们在把消息交回框架时仍然会回到同一个入口adapter 自己负责连接 brokeradapter 收到消息后交给统一 sinksink 再进入统一 router这就是“连接独立、消息主链共享”。2. listener 注册表仍然是一份MqttListenerRegistry还是全局共享的一份注册表。它并不是“每个 broker 一份 registry”而是用brokerId topic去做 resolve。这意味着listener 模型是统一的匹配时才根据 brokerId 做隔离隔离发生在解析条件而不是 Bean 复制层3. 发布接口仍然是一个MqttTemplateDefaultMqttTemplate的做法也很直接调用方传入brokerIdtemplate 从MqttClientAdapterRegistry里查到对应 adapter再把真正的 publish 委托给那个 adapter所以从业务视角看多 broker 并不会把 API 形状搞复杂。你仍然是一个MqttTemplate多个brokerId一套统一 publish API这就是 mqtt-plus 多 broker 设计里最实用的一点对框架内部来说broker 是多个连接实例对业务接口来说broker 只是一个显式参数。七、这种设计给业务带来了什么实际好处如果把第 6 篇放回真实项目场景里看这套结构至少有 4 个直接收益。1. 多环境或多层级接入不需要复制框架能力比如cloud负责云端设备接入edge负责本地边缘消息test负责测试 broker你不需要为每个 broker 再搭一套独立消息框架只需要新增 broker 配置和对应 adapter 实例。2. 路由、监听、发布模型保持一致多 broker 场景最容易把代码写乱的地方是A broker 一套 APIB broker 另一套 APIlistener 和 publisher 都要按 broker 分叉组织mqtt-plus 现在避免了这个问题。核心 API 形状没变只是 brokerId 成了显式维度。3. adapter 可以按 broker 选择实现当前 factory registry 已经支持显式选 adapter默认按兼容性自动选择这意味着未来不是所有 broker 都必须绑定同一个 transport 实现。4. 连接失败不会污染全局 registrystarter 在connect()失败时会做回滚这对多 broker 场景尤其重要。因为一旦其中一个 broker 连不上不应该让 registry 留下一个“看起来存在、实际上不可用”的 adapter。八、这套多 broker 设计当前的边界在哪里和前面几篇一样第 6 篇也要把边界说清楚。当前 mqtt-plus 的多 broker 模型已经非常清楚但它并没有继续往更复杂的 broker 编排能力上扩展比如没有 broker 间的自动 failover 抽象没有跨 broker 的发布策略路由没有“一个主题自动镜像到多个 broker”的高级协调层现在它解决的问题更明确让一个应用可以同时持有多个 MQTT 连接实例保证这些连接在 adapter 层彼此独立保证它们在 core 层继续共享统一抽象所以它更像一个“清晰的多连接基础架构”而不是一个“多 broker 编排平台”。这个边界其实是合理的因为只要先把 broker 维度的隔离和共享建模清楚后面更复杂的协调能力才有稳定落点。九、小结第 6 篇最重要的结论其实可以压缩成两句话每个 broker 都有自己独立的MqttBrokerDefinition、adapter 实例和连接生命周期所有 broker 又共享同一套 router、listener registry、template 和 Spring 装配模型也正因为这种“下层隔离、上层共享”的结构成立mqtt-plus 才能在不把 API 搞复杂的前提下支持一个应用同时连接多个 MQTT 服务。如果说前几篇更多是在讲“单条消息怎么流”那第 6 篇回答的是另一件事当系统边界扩大到多个 broker 时框架如何保持结构稳定。下一篇会继续顺着这个方向往前走进入一个更偏运行态的问题为什么首次连接、动态订阅变更和重连恢复最终可以收敛到同一条协调路径。系列导航本文是mqtt-plus 架构解析系列的第 6/10 篇。#主题链接1总览分层架构与设计哲学链接2消息路由一条 MQTT 消息如何到达你的MqttListener链接3Payload 序列化与反序列化双链设计的取舍链接4拦截器链MqttMessageInterceptor的扩展点设计链接5错误处理ErrorAction聚合策略的设计逻辑链接6多 Broker 管理如何让一个应用同时连接多个 MQTT 服务本文7动态订阅与重连恢复Reconciler的协调机制链接8Spring Boot 自动装配零件是怎么被粘合起来的链接9测试体系MqttTestTemplate与EmbeddedBroker的设计链接10从内部项目到开源框架mqtt-plus 的抽取过程与决策链接上一篇错误处理ErrorAction聚合策略的设计逻辑下一篇动态订阅与重连恢复Reconciler的协调机制

更多文章