在物联网应用中使用 MQTT 而不是 HTTP?

张开发
2026/4/7 9:07:54 15 分钟阅读

分享文章

在物联网应用中使用 MQTT 而不是 HTTP?
然后还实际用 MQTT 实战了一下大家感兴趣可以看看下边是原内容前两年有幸参与到一个智能家居项目的开发由于之前都没有过这方面的开发经验所以对智能硬件的开发模式和技术栈都颇为好奇。产品是一款可燃气体报警器如果家中燃气泄露浓度到达一定阈值报警器检测到并上传气体浓度值给后台后台以电话、短信、微信等方式提醒用户家中可能有气体泄漏。用户还可能向报警器发一些关闭报警、调整音量的指令等。整体功能还是比较简单的大致的逻辑如下图所示但当我真正的参与其中开发时其实有一点小小的失望因为在整个研发过程中并没用到什么新的技术还是常规的几种中间件只不过换个用法而已。技术选型用rabbitmq来做核心的组件主要考虑到运维成本低组内成员使用的熟练度比较高。下面和小伙伴分享一下如何用springbootrabbitmq搭建物联网IOT平台其实智能硬件也没想象的那么高不可攀很多小伙伴可能有点懵rabbitmq不是消息队列吗怎么又能做智能硬件了其实rabbitmq有两种协议我们平时接触的消息队列是用的AMQP协议而用在智能硬件中的是MQTT协议。一、什么是 MQTT协议MQTT全称(Message Queue Telemetry Transport)一种基于发布/订阅publish/subscribe模式的轻量级通讯协议通过订阅相应的主题来获取消息是物联网Internet of Thing中的一个标准传输协议。该协议将消息的发布者publisher与订阅者subscriber进行分离因此可以在不可靠的网络环境中为远程连接的设备提供可靠的消息服务使用方式与传统的MQ有点类似。TCP协议位于传输层MQTT协议位于应用层MQTT协议构建于TCP/IP协议上也就是说只要支持TCP/IP协议栈的地方都可以使用MQTT协议。二、为什么要用 MQTT协议MQTT协议为什么在物联网IOT中如此受偏爱而不是其它协议比如我们更为熟悉的HTTP协议呢首先HTTP协议它是一种同步协议客户端请求后需要等待服务器的响应。而在物联网IOT环境中设备会很受制于环境的影响比如带宽低、网络延迟高、网络通信不稳定等显然异步消息协议更为适合IOT应用程序。HTTP是单向的如果要获取消息客户端必须发起连接而在物联网IOT应用程序中设备或传感器往往都是客户端这意味着它们无法被动地接收来自网络的命令。通常需要将一条命令或者消息发送到网络上的所有设备上。HTTP要实现这样的功能不但很困难而且成本极高。三、MQTT协议介绍前边说过MQTT是一种轻量级的协议它只专注于发消息 所以此协议的结构也非常简单。MQTT数据包在MQTT协议中一个MQTT数据包由固定头Fixed header、可变头Variable header、消息体payload三部分构成。固定头Fixed header所有数据包中都有固定头包含数据包类型及数据包的分组标识。可变头Variable header部分数据包类型中有可变头。内容消息体Payload存在于部分数据包类是客户端收到的具体消息内容。1、固定头固定头部使用两个字节共16位(4-7)位表示消息类型使用4位二进制表示可代表如下的16种消息类型不过 0 和 15位置属于保留待用所以共14种消息事件类型。DUP Flag重试标识DUP Flag保证消息可靠传输消息是否已送达的标识。默认为0只占用一个字节表示第一次发送当值为1时表示当前消息先前已经被传送过。QoS Level消息质量等级QoS Level消息的质量等级后边会详细介绍RETAIN持久化值为1表示发送的消息需要一直持久保存而且不受服务器重启影响不但要发送给当前的订阅者且以后新加入的客户端订阅了此Topic订阅者也会马上得到推送。注意新加入的订阅者只会取出最新的一个RETAIN flag 1的消息推送。值为0仅为当前订阅者推送此消息。Remaining Length剩余长度在当前消息中剩余的byte(字节)数包含可变头部和消息体payload。2、可变头固定头部仅定义了消息类型和一些标志位一些消息的元数据需要放入可变头部中。可变头部内容字节长度 消息体payload 剩余长度。可变头部居于固定头部和payload中间包含了协议名称版本号连接标志用户授权心跳时间等内容。可变头存在于这些类型的消息PUBLISH (QoS 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。3、消息体payload消息体payload只存在于CONNECT、PUBLISH、SUBSCRIBE、SUBACK、UNSUBSCRIBE这几种类型的消息CONNECT包含客户端的ClientId、订阅的Topic、Message以及用户名和密码。PUBLISH向对应主题发送消息。SUBSCRIBE要订阅的主题以及QoS。SUBACK服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。UNSUBSCRIBE取消要订阅的主题。消息质量QoS 消息质量Quality of Service即消息的发送质量发布者publisher和订阅者subscriber都可以指定qos等级有QoS 0、QoS 1、QoS 2三个等级。下边分别说明一下这三个等级的区别。1、Qos 0At most once至多一次只发送一次消息不保证消息是否成功送达没有确认机制消息可能会丢失或重复。2、Qos 1At least once至少一次相对于QoS 0而言Qos 1增加了ack确认机制发送者publisher推送消息到MQTT代理broker时两者自身都会先持久化消息只有当publisher或者Broker分别收到PUBACK确认时才会删除自身持久化的消息否则就会重发。但有个问题尽管我们可以通过确认来保证一定收到客户端 或 服务器的message可我们却不能保证仅收到一次message也就是当客户端publisher没收到Broker的puback或者Broker没有收到subscriber的puback那么就会一直重发。publisher - broker 大致流程publisher store msg - publish -broker 传递messagebroker - puback - publisher delete msg 确认传递成功3、Qos 2Exactly once只有一次相对于QoS 1QoS 2升级实现了仅接受一次messagepublisher和broker同样对消息进行持久化其中publisher缓存了message和 对应的msgID而broker缓存了msgID可以保证消息不重复由于又增加了一个confirm机制整个流程变得复杂很多。publisher - broker 大致流程publisher store msg - publish -broker - broker storemsgID传递message broker - puberc 确认传递成功publisher - pubrel -broker delete msgID 告诉broker删除msgIDbroker - pubcomp - publisher delete msg 告诉publisher删除msgLWT最后遗嘱LWT全称为Last Will and Testament其实遗嘱是一个由客户端预先定义好的主题和对应消息附加在CONNECT的数据包中包括遗愿主题、遗愿 QoS、遗愿消息等。当MQTT代理Broker检测到有客户端client非正常断开连接时再由服务器主动发布此消息然后相关的订阅者会收到消息。举个栗子聊天室中所有人都订阅一个叫talk的主题 但小富由于网络抖动突然断开了链接这时聊天室中所有订阅主题talk的客户端都会收到一个 “小富离开聊天室” 的遗愿消息。遗嘱的相关参数Will Flag是否使用 LWT1 开启Will Topic遗愿主题名不可使用通配符Will Qos发布遗愿消息时使用的 QoSWill Retain遗愿消息的 Retain 标识Will Message遗愿消息内容那客户端Client有哪些场景是非正常断开连接呢Broker检测到底层的 I/O 异常客户端 未能在心跳Keep Alive的间隔内和Broker进行消息交互客户端 在关闭底层TCP连接前没有发送DISCONNECT数据包客户端 发送错误格式的数据包到Broker导致关闭和客户端的连接等。注意当客户端通过发布DISCONNECT数据包断开连接时属于正常断开连接并不会触发LWT的机制与此同时Broker还会丢弃掉当前客户端在连接时指定的相关LWT参数。四、MQTT协议应用场景MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。使用的场景也是非常非常多下边列举一些物联网M2M通信物联网大数据采集Android消息推送WEB消息推送移动即时消息例如Facebook Messenger智能硬件、智能家具、智能电器车联网通信电动车站桩采集智慧城市、远程医疗、远程教育电力、石油与能源等行业市场五、代码实现具体rabbitmq的环境搭建就不赘述了网上教程比较多有条件的用服务器没条件的像我搞个Windows版的也很快乐嘛。1、启用 rabbitmq的mqtt协议我们先开启rabbitmq的mqtt协议因为默认安装下是关闭的命令如下rabbitmq-plugins enable rabbitmq_mqtt2、mqtt 客户端依赖包上一步中安装rabbitmq环境并开启mqtt协议后实际上mqtt消息代理服务就搭建好了接下来要做的就是实现客户端消息的推送和订阅。这里使用spring-integration-mqtt、org.eclipse.paho.client.mqttv3两个工具包实现。!--mqtt依赖包-- dependency groupIdorg.springframework.integration/groupId artifactIdspring-integration-mqtt/artifactId /dependency dependency groupIdorg.eclipse.paho/groupId artifactIdorg.eclipse.paho.client.mqttv3/artifactId version1.2.0/version /dependency3、消息发送者消息的发送比较简单主要是应用到ServiceActivator注解需要注意messageHandler.setAsync属性如果设置成false关闭异步模式发送消息时可能会阻塞。Configuration public class IotMqttProducerConfig { Autowired private MqttConfig mqttConfig; Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory new DefaultMqttPahoClientFactory(); factory.setServerURIs(mqttConfig.getServers()); return factory; } Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } Bean ServiceActivator(inputChannel iotMqttInputChannel) public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory()); messageHandler.setAsync(false); messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); return messageHandler; } }MQTT对外提供发送消息的API时需要使用MessagingGateway注解去提供一个消息网关代理参数defaultRequestChannel指定发送消息绑定的channel。可以实现三种API接口payload为发送的消息topic发送消息的主题qos消息质量。MessagingGateway(defaultRequestChannel iotMqttInputChannel) public interface IotMqttGateway { // 向默认的 topic 发送消息 void sendMessage2Mqtt(String payload); // 向指定的 topic 发送消息 void sendMessage2Mqtt(String payload,Header(MqttHeaders.TOPIC) String topic); // 向指定的 topic 发送消息并指定服务质量参数 void sendMessage2Mqtt(Header(MqttHeaders.TOPIC) String topic, Header(MqttHeaders.QOS) int qos, String payload); }4、消息订阅消息订阅和我们平时用的MQ消息监听实现思路基本相似ServiceActivator注解表明当前方法用于处理MQTT消息inputChannel参数指定了用于接收消息的channel。/** * Author: xiaofu * Description: 消息订阅配置 * date 2020/6/8 18:24 */ Configuration public class IotMqttSubscriberConfig { Autowired private MqttConfig mqttConfig; Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory new DefaultMqttPahoClientFactory(); factory.setServerURIs(mqttConfig.getServers()); return factory; } Bean public MessageChannel iotMqttInputChannel() { return new DirectChannel(); } Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic()); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(iotMqttInputChannel()); return adapter; } /** * author xiaofu * description 消息订阅 * date 2020/6/8 18:20 */ Bean ServiceActivator(inputChannel iotMqttInputChannel) public MessageHandler handlerTest() { return message - { try { String string message.getPayload().toString(); System.out.println(接收到消息 string); } catch (MessagingException ex) { //logger.info(ex.getMessage()); } }; } }六、测试消息额~ 由于本渣渣对硬件一窍不通为了模拟硬件的发送消息只能借助一下工具其实硬件端实现MQTT协议跟我们前边的基本没什么区别只不过换种语言嵌入到硬件中而已。这里选的测试工具为mqttbox下载地址http://workswithweb.com/mqttbox.html1、测试消息发送我们用先用mqttbox模拟向主题mqtt_test_topic发送消息看后台是否能成功接收到。看到后台成功拿到了向主题mqtt_test_topic发送的消息。2、测试消息订阅用mqttbox模拟订阅主题mqtt_test_topic在后台向主题mqtt_test_topic发送一条消息这里我简单的写了个controller调用API发送消息。http://127.0.0.1:8080/fun/testMqtt?topicmqtt_test_topicmessage我是后台向主题 mqtt_test_topic 发送的消息我们看mqttbox的订阅消息已经成功的接收到了后台的消息到此我们的MQTT通信环境就算搭建成功了。如果把mqttbox工具换成具体硬件设备整个流程就是我们常说的智能家居了其实真的没那么难。七、应用注意事项在我们实际的生产环境中遇到过的问题这里分享一下让大家少踩坑。clientId 要唯一在客户端connect连接的时会有一个clientId参数需要每个客户端都保持唯一的。但我们在开发测试阶段clientId直接在代码中写死了而且服务都是单实例部署并没有暴露出什么问题。MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());然而在生产环境内侧的时候由于服务是多实例集群部署结果出现了下边的奇怪问题。同一时间内只能有一个客户端能拿到消息其他客户端不但不能消费消息而且还在不断的掉线重连Lost connection: 已断开连接; retrying...。这就是由于clientId相同导致客户端间相互竞争消费最后将clientId获取方式换成从发号器中拿问题就好了所以这个地方是需要特别注意的。平时程序在开发环境没问题可偏偏到了生产环境就一大堆问题很多都是因为服务部署方式不同导致的。所以多学习分布式还是很有必要的。八、其他中间件MQTT它只是一种协议支持MQTT协议的消息中间件产品非常多下边的也只是其中的一部分MosquittoEclipse PahoRabbitMQApache ActiveMQHiveMQJoramMQThingMQVerneMQApache Apolloemqttd XivelyIBM Websphere …..总结我也是第一次做和硬件相关的项目之前听到智能家居都会觉得好高大上但实际上手开发后发现技术嘛万变不离其宗也只是换种用法而已。

更多文章