锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. JAVA
  4. Springboot整合mqtt最新教程,完整教程,最佳实践

Springboot整合mqtt最新教程,完整教程,最佳实践

0
  • JAVA
  • 发布于 2024-08-06
  • 0 次阅读
黄健
黄健

原文链接:https://blog.csdn.net/qq_21017997/article/details/135086796

前言:

  关于整合mqtt网上的教程很多,但大部分都是cv来cv去,中间的监听代码也没有讲清楚。教程也是很久之前的。所以决定自己来写一个教程。废话不多说直接开始教程。

本文只有教程,没有其他废话,如果需要请留言,后续更新下一版(包括主题消息的订阅方式改变,其他断线重连方式,EMQX的API对接-监听设备更加方便)。

Springboot整合mqtt采用注解进行监听(第二篇)

第一步安装 EMQX:

  MQTT服务用的是EMQX,安装方式请搜索EMQX,移步他们官网。

  官网地址:https://www.emqx.io/zh

  测试工具请下载MQTTX

  安装完成后默认管理地址:ip:18083 账户:admin 密码:public

第二步加入依赖:

  <!-- MQTT相关配置 -->

        <dependency>

            <groupId>org.springframework.integration</groupId>

            <artifactId>spring-integration-core</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-integration</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.integration</groupId>

            <artifactId>spring-integration-stream</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.integration</groupId>

            <artifactId>spring-integration-mqtt</artifactId>

        </dependency>

第三步加入配置:

  在yml文件中加入以下配置

#MQTT客户端

publish:

    mqtt:

        host: tcp://127.0.0.1:1883

        clientId: mqtt_publish

        options:

            userName: GuoShun

            password: qq1101165230

            # 这里表示会话不过期

            cleanSession: false

            # 配置一个默认的主题,加载时不会用到,只能在需要时手动提取

            defaultTopic: devops

            timeout: 1000

            KeepAliveInterval: 10

            #断线重连方式,自动重新连接与会话不过期配合使用会导致

            #断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我

            automaticReconnect: true

            connectionTimeout: 3000

            # 最大链接数

            maxInflight: 100

第四步创建一个MQTTConfigBuilder类,用来加载yml中的配置

/**

 * @author by Guoshun

 * @version 1.0.0

 * @description mqtt配置类

 * @date 2023/12/12 15:10

 */

@Configuration

@ConfigurationProperties(MQTTConfigBuilder.PREFIX)

@Data

public class MQTTConfigBuilder {

	

	//配置的名称

    public static final String PREFIX = "publish.mqtt";

    /**

     * 服务端地址

     */

    private String host;

    /**

     * 客户端id

     */

    private String clientId;

    /**

     * 配置链接项

     */

    private MqttConnectOptions options;

}

第五步创建一个MQTTClientUtils

@Slf4j

@Configuration

public class MQTTClientUtils {

    @Autowired

    private MQTTConfigBuilder mqttConfig;

    private MqttClient mqttClient;

    public MQTTClientUtils createDevOpsMQTTClient() {

        this.createMQTTClient();

        return this;

    }

    private MQTTClientUtils connect() {

        try {

            this.mqttClient.connect(mqttConfig.getOptions());

            log.info("MQTTClient连接成功!");

        }catch (MqttException mqttException){

            mqttException.printStackTrace();

            log.error("MQTTClient连接失败!");

        }

        return this;

    }

    private MqttClient createMQTTClient() {

        try{

            this.mqttClient = new MqttClient( mqttConfig.getHost(), mqttConfig.getClientId());

            log.info("MQTTClient创建成功!");

            return this.mqttClient;

        }catch (MqttException exception){

            exception.printStackTrace();

            log.error("MQTTClient创建失败!");

            return null;

        }

    }

    /**

     * 消息发送

     * @param topicName

     * @param message

     * @return

     */

    public boolean publish(String topicName, String message) {

        log.info("订阅主题名:{}, message:{}", topicName, message);

        MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));

        try {

            this.mqttClient.publish(topicName, mqttMessage);

            return true;

        }catch (MqttException exception){

            exception.printStackTrace();

            return false;

        }

    }

    /**

     * 消息发送 : retained 默认为 false

     * "retained message" 指的是 Broker 会保留的最后一条发布到某个主题的消息。

     * 当新的订阅者连接到该主题时,Broker 会将这条保留消息立即发送给订阅者,即使在订阅者订阅时该消息并未被重新发布。

     * 这对于一些需要初始状态或者最后一次已知状态的应用场景非常有用。

     * @param topicName

     * @param message

     * @param qos

     * @return

     */

    public boolean publish(String topicName, int qos, String message) {

        log.info("主题名:{}, qos:{}, message:{}", topicName, qos, message);

        MqttMessage mqttMessage = new MqttMessage(message.getBytes(StandardCharsets.UTF_8));

        try {

            this.mqttClient.publish(topicName, mqttMessage.getPayload(), qos, false);

            return true;

        }catch (MqttException exception){

            exception.printStackTrace();

            return false;

        }

    }

    /**

     * 订阅某个主题

     *

     * @param topicName

     * @param qos

     */

    public void subscribe(String topicName, int qos) {

        log.info("订阅主题名:{}, qos:{}", topicName, qos);

        try {

            this.mqttClient.subscribe(topicName, qos);

        } catch (MqttException e) {

            e.printStackTrace();

        }

    }

    /**

     * 订阅某个主题

     *

     * @param topicName

     * @param qos

     */

    public void subscribe(String topicName, int qos, IMqttMessageListener messageListener) {

        log.info("订阅主题名:{}, qos:{}, Listener类:{}", topicName, qos, messageListener.getClass());

        try {

            this.mqttClient.subscribe(topicName, qos, messageListener);

        } catch (MqttException e) {

            e.printStackTrace();

        }

    }

    /**

     * 取消订阅主题

     * @param topicName 主题名称

     */

    public void cleanTopic(String topicName) {

        log.info("取消订阅主题名:{}", topicName);

        try {

            this.mqttClient.unsubscribe(topicName);

        } catch (MqttException e) {

            e.printStackTrace();

        }

    }

//这里是初始化方法

    @PostConstruct

    public void initMqttClient(){

        //创建连接

        MQTTClientUtils mqttClientUtils = this.createDevOpsMQTTClient().connect();

        //这里主要是项目启动时订阅一些主题。看个人需要使用

        //mqttClientUtils.subscribe("test/#", 2, new HeartBeatListener());

        //MessageCallbackListener订阅主题,接受到该主题消息后交给MessageCallbackListener去处理

         mqttClientUtils.subscribe("message/call/back", 2, new MessageCallbackListener());

        //需要注意的是new MessageCallbackListener()虽然会接收到消息,但这么做不对。

        //举个简单列子:就是你有切面对MessageCallbackListener中重写的方法做一些其他操作,

        //那么接收到消息后该切面并不会生效,所以不建议这么做,以下是修改过后的。

      	//@Resource

    	//private MessageCallbackListener messageCallbackListener;

    	//上面两句放到外面好吧!!!评论的大哥想的有点多,还要写个上下文获取bean...

    	//mqttClientUtils.subscribe("message/call/back", 2, messageCallbackListener);

    }

}

使用方式创建一个MQTTService

 可以MQTTClientUtils去调用方法,也可以将MQTTService 扩展开来,使用MQTTService 去调用方法

/**

 * @author by Guoshun

 * @version 1.0.0

 * @description MQTT服务类,负责调用发送消息

 * @date 2023/12/12 16:53

 */

@Service

public class MQTTService {

    @Resource

    private MQTTClientUtils mqttClientUtils;

    /**

     * 向主题发送消息

     * @param topicName

     * @param message

     */

    public void sendMessage(String topicName, String message){

        mqttClientUtils.publish(topicName, message);

    }

    /**

     * 向主题发送消息

     * @param topicName 主题名称

     * @param qos qos

     * @param message 具体消息

     */

    public void sendMessage(String topicName,int qos, String message){

        mqttClientUtils.publish(topicName, qos, message);

    }

}

消息监听处理(简单实现)

/**

 * @author by Guoshun

 * @version 1.0.0

 * @description 消息回调返回

 * @date 2023/12/12 17:27

 */

@Component

public class MessageCallbackListener implements IMqttMessageListener {

    @Override

    public void messageArrived(String topic, MqttMessage message) throws Exception {

        String messageBody = new String(message.getPayload(), StandardCharsets.UTF_8);

        System.out.println("收到消息:"+topic+", 消息内容是:"+ messageBody);

    }

}

标签: #JAVA 991
相关文章

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

SpringBoot整合异步任务执行 2024-10-08 11:24

同步任务: 同步任务是在单线程中按顺序执行,每次只有一个任务在执行,不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务,无法同时处理多个任务,响应慢,影响用 户体验 异步任务: 异步任务是在多线程中同时执行,多个任务可以并发执行,同时处理多个请求,响应快,资源

springboot kafka多数据源,通过配置动态加载发送者和消费者 2024-10-08 11:24

前言 最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。 准备test kafka本身非常容易上手,如果我们需要单元测试,引入ja

SpringBoot 集成 Redis 2024-10-08 11:24

一:SpringBoot 集成 Redis ①Redis是一个 NoSQL(not only)数据库, 常作用缓存 Cache 使用。 ②Redis是一个中间件、是一个独立的服务器;常用的数据类型: string , hash ,set ,zset , list ③通过Redis客户端可以使用多种语

SpringBoot整合QQ邮箱 2024-10-08 11:24

SpringBoot可以通过导入依赖的方式集成多种技术,这当然少不了我们常用的邮箱,现在本章演示SpringBoot整合QQ邮箱发送邮件…. 下面按步骤进行: 1.获取QQ邮箱授权码 1.1 登录QQ邮箱 1.2 开启SMTP服务 找到下图中的SMTP服务区域,如果当前账号未开启的话自己手动开启。

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.