锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. JAVA
  4. 在Spring Boot项目中集成和使用MQTT

在Spring Boot项目中集成和使用MQTT

0
  • JAVA
  • 发布于 2024-08-06
  • 0 次阅读
黄健
黄健

原文链接:https://blog.csdn.net/mbh12333/article/details/139144711

在物联网(IoT)应用中,MQTT(消息队列遥测传输)协议因其轻量级和高效性被广泛使用。在Spring Boot项目中,我们可以通过集成org.springframework.integration:spring-integration-mqtt依赖来实现对MQTT的支持。本文将逐步介绍如何在Spring Boot应用中使用MQTT。

1. 添加依赖

首先,我们需要在项目的pom.xml文件中添加Spring Integration MQTT的依赖:

<dependencies>

    <!-- Spring Boot Starter -->

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter</artifactId>

    </dependency>

    <!-- Spring Integration MQTT -->

    <dependency>

        <groupId>org.springframework.integration</groupId>

        <artifactId>spring-integration-mqtt</artifactId>

    </dependency>

    <!-- MQTT Client Library (Paho) -->

    <dependency>

        <groupId>org.eclipse.paho</groupId>

        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>

    </dependency>

</dependencies>

2. 配置MQTT

在Spring Boot应用的配置文件application.properties中添加MQTT相关配置:

mqtt.broker.url=tcp://localhost:1883

mqtt.client.id=spring-boot-mqtt-client

mqtt.username=your-username

mqtt.password=your-password

mqtt.default.topic=your/topic

3. 创建MQTT配置类

创建一个新的配置类,用于配置MQTT连接和消息处理:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.integration.annotation.ServiceActivator;

import org.springframework.integration.channel.DirectChannel;

import org.springframework.integration.core.MessageProducer;

import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;

import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;

import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;

import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.MessageHandler;

@Configuration

public class MqttConfig {

    @Bean

    public MqttPahoClientFactory mqttClientFactory() {

        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();

        MqttConnectOptions options = new MqttConnectOptions();

        options.setServerURIs(new String[] { "tcp://localhost:1883" });

        options.setUserName("your-username");

        options.setPassword("your-password".toCharArray());

        factory.setConnectionOptions(options);

        return factory;

    }

    @Bean

    public MessageChannel mqttInputChannel() {

        return new DirectChannel();

    }

    @Bean

    public MessageProducer inbound() {

        MqttPahoMessageDrivenChannelAdapter adapter =

                new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",

                        mqttClientFactory(), "your/topic");

        adapter.setCompletionTimeout(5000);

        adapter.setConverter(new DefaultPahoMessageConverter());

        adapter.setQos(1);

        adapter.setOutputChannel(mqttInputChannel());

        return adapter;

    }

    @Bean

    @ServiceActivator(inputChannel = "mqttInputChannel")

    public MessageHandler handler() {

        return message -> {

            String payload = (String) message.getPayload();

            System.out.println("Received message: " + payload);

        };

    }

    @Bean

    public MessageChannel mqttOutboundChannel() {

        return new DirectChannel();

    }

    @Bean

    @ServiceActivator(inputChannel = "mqttOutboundChannel")

    public MessageHandler mqttOutbound() {

        MqttPahoMessageHandler messageHandler =

                new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound", mqttClientFactory());

        messageHandler.setAsync(true);

        messageHandler.setDefaultTopic("your/topic");

        return messageHandler;

    }

}

4. 发送和接收消息

在你的服务或控制器中,可以使用如下方法发送消息:

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.integration.support.MessageBuilder;

import org.springframework.messaging.MessageChannel;

import org.springframework.stereotype.Service;

@Service

public class MqttMessageSender {

    @Autowired

    private MessageChannel mqttOutboundChannel;

    public void sendMessage(String topic, String payload) {

        mqttOutboundChannel.send(MessageBuilder.withPayload(payload)

                .setHeader("mqtt_topic", topic)

                .build());

    }

}

要接收消息,可以配置handler方法中的处理逻辑,或将消息发送到另一个Spring Integration通道进行进一步处理。

5. 使用示例

在一个控制器中调用发送消息方法:

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.RestController;

@RestController

public class MqttController {

    @Autowired

    private MqttMessageSender mqttMessageSender;

    @GetMapping("/send")

    public String send(@RequestParam String topic, @RequestParam String message) {

        mqttMessageSender.sendMessage(topic, message);

        return "Message sent to topic " + topic;

    }

}

这样,你就可以通过HTTP请求发送MQTT消息了。例如,访问http://localhost:8080/send?topic=test/topic&message=Hello,将消息发送到MQTT主题test/topic。

这就是一个完整的Spring Boot应用中集成MQTT的简单示例,希望对你有所帮助!

MQTT报文头介绍

MQTT协议的请求报文头非常轻量级。MQTT协议定义了固定报文头和可变报文头两部分。以下是各类报文的基本格式:

固定报文头

所有MQTT报文都有一个固定报文头,占据2-5个字节。固定报文头包含报文类型和一些控制标志。

固定报文头格式

第一个字节:

位7-4:报文类型(Message Type)

位3-0:标志(Flags),根据报文类型不同而不同

第二个字节及后续字节:

剩余长度(Remaining Length),表示剩余报文的字节数。采用可变长度编码,每个字节的最高位用于指示是否有后续字节。

各类报文示例

连接报文(CONNECT)

连接报文用于客户端请求与服务器建立连接。其报文头如下:

固定报文头:

第一个字节:0x10(CONNECT报文类型是1,标志位为0000)

第二个字节:剩余长度(根据可变部分长度而定)

可变报文头:

协议名(“MQTT”)

协议级别(4,表示MQTT 3.1.1)

连接标志(Connect Flags)

保持连接时间(Keep Alive)

有效载荷:

客户端标识符(Client Identifier)

用户名(可选)

密码(可选)

遗嘱主题(可选)

遗嘱消息(可选)

连接确认报文(CONNACK)

服务器响应客户端的连接请求。其报文头如下:

固定报文头:

第一个字节:0x20(CONNACK报文类型是2,标志位为0000)

第二个字节:剩余长度(2字节)

可变报文头:

连接确认标志(0x00或0x01)

返回码(0表示连接成功,其他值表示错误)

发布报文(PUBLISH)

客户端或服务器发送消息到指定主题。其报文头如下:

固定报文头:

第一个字节:0x30(PUBLISH报文类型是3,标志位根据QoS等级、重复标志和保留标志变化)

第二个字节:剩余长度(根据主题名、消息ID和消息体长度而定)

可变报文头:

主题名(Topic Name)

消息ID(QoS等级为1或2时需要)

有效载荷:

消息内容

示例

以下是一个PUBLISH报文的示例:

30 0B                # 固定报文头 (PUBLISH,QoS 0)

00 05                # 主题名长度

74 6F 70 69 63       # 主题名 "topic"

68 65 6C 6C 6F       # 消息内容 "hello"

在这个示例中:

第一个字节 0x30 表示这是一个PUBLISH报文,QoS等级为0,重复标志和保留标志为0。

第二个字节 0x0B 表示剩余长度为11个字节。

接下来的两个两个字节 0x00 0x05 表示主题名的长度为5个字节。

接下来的5个字节 0x74 0x6F 0x70 0x69 0x63 表示主题名 “topic”。

最后5个字节 0x68 0x65 0x6C 0x6C 0x6F 表示消息内容 “hello”。

这种结构使得MQTT报文非常紧凑和高效,特别适合物联网设备的通信。希望这篇文章能帮助你更好地理解和使用MQTT协议。

标签: #JAVA 991
相关文章

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

SpringBoot整合异步任务执行 2024-10-08 11:24

同步任务: 同步任务是在单线程中按顺序执行,每次只有一个任务在执行,不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务,无法同时处理多个任务,响应慢,影响用 户体验 异步任务: 异步任务是在多线程中同时执行,多个任务可以并发执行,同时处理多个请求,响应快,资源

springboot kafka多数据源,通过配置动态加载发送者和消费者 2024-10-08 11:24

前言 最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。 准备test kafka本身非常容易上手,如果我们需要单元测试,引入ja

SpringBoot 集成 Redis 2024-10-08 11:24

一:SpringBoot 集成 Redis ①Redis是一个 NoSQL(not only)数据库, 常作用缓存 Cache 使用。 ②Redis是一个中间件、是一个独立的服务器;常用的数据类型: string , hash ,set ,zset , list ③通过Redis客户端可以使用多种语

SpringBoot整合QQ邮箱 2024-10-08 11:24

SpringBoot可以通过导入依赖的方式集成多种技术,这当然少不了我们常用的邮箱,现在本章演示SpringBoot整合QQ邮箱发送邮件…. 下面按步骤进行: 1.获取QQ邮箱授权码 1.1 登录QQ邮箱 1.2 开启SMTP服务 找到下图中的SMTP服务区域,如果当前账号未开启的话自己手动开启。

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.