锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. JAVA
  4. Spring Boot整合STOMP实现实时通信

Spring Boot整合STOMP实现实时通信

0
  • JAVA
  • 发布于 2024-08-14
  • 0 次阅读
黄健
黄健

本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net

目录

引言

代码实现

配置类 WebSocketMessageBrokerConfig

DTO

工具类

Controller

common.html

stomp-broadcast.html

运行效果

完整代码地址

引言

STOMP(Simple Text Oriented Messaging Protocol)作为一种简单文本导向的消息传递协议,提供了一种轻量级且易于使用的方式来实现实时通信。本篇博客将讲解如何使用 Spring Boot 创建一个基于 STOMP 的 WebSocket 应用程序,并展示相关的配置类。同时,还会介绍如何使用 Thymeleaf 模板引擎生成动态的 HTML 页面,以展示实时通信的效果。

代码实现

配置类 WebSocketMessageBrokerConfig

package com.wsl.websocket.config;
 
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
 
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {
 
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }
 
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // it is OK to leave it here
        registry.addEndpoint("/broadcast");
        //registry.addEndpoint("/broadcast").withSockJS();
        // custom heartbeat, every 60 sec
        //registry.addEndpoint("/broadcast").withSockJS().setHeartbeatTime(60_000);
    }
}

 DTO

package com.wsl.websocket.dto;
 
import com.wsl.websocket.util.StringUtils;
import lombok.Getter;
import lombok.Setter;
 
@Getter
@Setter
public class ChatMessage {
 
    private String from;
    private String text;
    private String recipient;
    private String time;
 
    public ChatMessage() {
 
    }
 
    public ChatMessage(String from, String text, String recipient) {
        this.from = from;
        this.text = text;
        this.recipient = recipient;
        this.time = StringUtils.getCurrentTimeStamp();
    }
}

工具类

package com.wsl.websocket.util;
 
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
 
public class StringUtils {
    
    private static final String TIME_FORMATTER= "HH:mm:ss";
    
    public static String getCurrentTimeStamp() {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(TIME_FORMATTER);
        return LocalDateTime.now().format(formatter);
    }
}

Controller

package com.wsl.websocket.controller;
 
import com.wsl.websocket.dto.ChatMessage;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
 
@Controller
public class WebSocketBroadcastController {
 
    @GetMapping("/stomp-broadcast")
    public String getWebSocketBroadcast() {
        return "stomp-broadcast";
    }
 
    @GetMapping("/sockjs-broadcast")
    public String getWebSocketWithSockJsBroadcast() {
        return "sockjs-broadcast";
    }
 
    @MessageMapping("/broadcast")
    @SendTo("/topic/broadcast")
    public ChatMessage send(ChatMessage chatMessage) {
        return new ChatMessage(chatMessage.getFrom(), chatMessage.getText(), "ALL");
    }
}

common.html

src/main/resources/templates/common.html

<!DOCTYPE HTML>
<html xmlns:th="http://www.thymeleaf.org">
<head th:fragment="headerfiles">
    <meta charset="UTF-8">
    <meta >
    <link rel="stylesheet" type="text/css" th:href="@{/webjars/bootstrap/4.4.1/css/bootstrap.css}"/>
    <link rel="stylesheet" type="text/css" th:href="@{/css/main.css}"/>
    <script th:src="@{/webjars/jquery/3.4.1/jquery.js}"></script>
</head>
<body>
<footer th:fragment="footer" class="my-5 text-muted text-center text-small">
    <p class="mb-1">© 2020 Dariawan</p>
    <ul class="list-inline">
        <li class="list-inline-item"><a href="https://www.dariawan.com">Homepage</a></li>
        <li class="list-inline-item"><a href="#">Articles</a></li>
    </ul>
</footer>
</body>
</html>

stomp-broadcast.html

src/main/resources/templates/stomp-broadcast.html

<!DOCTYPE HTML>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <title>WebSocket With STOMP Broadcast Example</title>
    <th:block th:include="common.html :: headerfiles"></th:block>
</head>
<body>
<div class="container">
    <div class="py-5 text-center">
        <a href="/"><h2>WebSocket</h2></a>
        <p class="lead">WebSocket Broadcast - with STOMP</p>
    </div>
    <div class="row">
        <div class="col-md-6">
            <div class="mb-3">
                <div class="input-group">
                    <input type="text" id="from" class="form-control" placeholder="Choose a nickname"/>
                    <div class="btn-group">
                        <button type="button" id="connect" class="btn btn-sm btn-outline-secondary" onclick="connect()">
                            Connect
                        </button>
                        <button type="button" id="disconnect" class="btn btn-sm btn-outline-secondary"
                                onclick="disconnect()" disabled>Disconnect
                        </button>
                    </div>
                </div>
            </div>
            <div class="mb-3">
                <div class="input-group" id="sendmessage" style="display: none;">
                    <input type="text" id="message" class="form-control" placeholder="Message">
                    <div class="input-group-append">
                        <button id="send" class="btn btn-primary" onclick="send()">Send</button>
                    </div>
                </div>
            </div>
        </div>
        <div class="col-md-6">
            <div id="content"></div>
            <div>
                        <span class="float-right">
                            <button id="clear" class="btn btn-primary" onclick="clearBroadcast()"
                                    style="display: none;">Clear</button>
                        </span>
            </div>
        </div>
    </div>
</div>
 
<footer th:insert="common.html :: footer"></footer>
 
<script th:src="@{/webjars/stomp-websocket/2.3.3-1/stomp.js}" type="text/javascript"></script>
<script type="text/javascript">
    var stompClient = null;
    var userName = $("#from").val();
 
    function setConnected(connected) {
        $("#from").prop("disabled", connected);
        $("#connect").prop("disabled", connected);
        $("#disconnect").prop("disabled", !connected);
        if (connected) {
            $("#sendmessage").show();
        } else {
            $("#sendmessage").hide();
        }
    }
 
    function connect() {
        userName = $("#from").val();
        if (userName == null || userName === "") {
            alert('Please input a nickname!');
            return;
        }
        /*<![CDATA[*/
        var url = /*[['ws://'+${#httpServletRequest.serverName}+':'+${#httpServletRequest.serverPort}+@{/broadcast}]]*/ 'ws://localhost:8080/broadcast';
        /*]]>*/
        stompClient = Stomp.client(url);
        stompClient.connect({}, function () {
            stompClient.subscribe('/topic/broadcast', function (output) {
                showBroadcastMessage(createTextNode(JSON.parse(output.body)));
            });
 
            sendConnection(' connected to server');
            setConnected(true);
        }, function (err) {
            alert('error' + err);
        });
    }
 
    function disconnect() {
        if (stompClient != null) {
            sendConnection(' disconnected from server');
 
            stompClient.disconnect(function () {
                console.log('disconnected...');
                setConnected(false);
            });
        }
    }
 
    function sendConnection(message) {
        var text = userName + message;
        sendBroadcast({'from': 'server', 'text': text});
    }
 
    function sendBroadcast(json) {
        stompClient.send("/app/broadcast", {}, JSON.stringify(json));
    }
 
    function send() {
        var text = $("#message").val();
        sendBroadcast({'from': userName, 'text': text});
        $("#message").val("");
    }
 
    function createTextNode(messageObj) {
        return '<div class="row alert alert-info"><div class="col-md-8">' +
            messageObj.text +
            '</div><div class="col-md-4 text-right"><small>[<b>' +
            messageObj.from +
            '</b> ' +
            messageObj.time +
            ']</small>' +
            '</div></div>';
    }
 
    function showBroadcastMessage(message) {
        $("#content").html($("#content").html() + message);
        $("#clear").show();
    }
 
    function clearBroadcast() {
        $("#content").html("");
        $("#clear").hide();
    }
</script>
</body>
</html>

运行效果 

http://localhost:8080/stomp-broadcast

完整代码地址

 GitHub - wangsilingwsl/springboot-stomp: springboot integrates stomp

指定消息的目标用户

此功能基于 Spring Boot,和上面代码分隔开,没有关联关系。请结合实际情况参考下列代码。

配置类

package com.twqc.config.websocket;
 
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
 
/**
 * WebSocket消息代理配置类
 * <p>
 * 注解@EnableWebSocketMessageBroker表示启用WebSocket消息代理功能。不开启不能使用@MessageMapping和@SendTo注解。
 * 注意:@MessageMapping和@SendTo的使用方法:
 * <p>
 * 注解@MessageMapping的方法用于接收客户端发送的消息,方法的参数用于接收消息内容,方法的返回值用于发送消息内容。
 * 注解@SendTo的方法用于发送消息内容,方法的返回值用于发送消息内容。
 * <p>
 * 示例:@MessageMapping("/example")注解的方法用于接收客户端发送的消息,@SendTo("/topic/example")注解的方法用于发送消息内容。
 * 3.1 对应的客户端连接websocket的路径为:ws://localhost:8080/example
 * 3.2 对应的客户端发送消息的路径为:/app/example
 * 3.3 对应的客户端接收消息的路径为:/topic/example
 * 3.4 app和topic在WebSocketMessageBrokerConfigurer.configureMessageBroker方法中配置
 * 3.5 具体的路径需要自己定义,上文仅为示例,与本项目中使用的路径无关。
 * <p>
 *
 * @author wsl
 * @date 2024/2/29
 * @see WebSocketMessageBrokerConfigurer
 * @see MessageMapping
 * @see SendTo
 * @see <a href="https://www.dariawan.com/tutorials/spring/spring-boot-websocket-stomp-tutorial/">Spring Boot WebSocket with STOMP Tutorial</a>
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
 
    /**
     * 配置消息代理
     *
     * @param config 消息代理配置注册器
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // 设置消息代理的目的地前缀,所有以"/websocket/topic"开头的消息都会被代理发送给订阅了该目的地的客户端
        config.enableSimpleBroker("/websocket/topic");
        // 设置应用的目的地前缀,所有以"/websocket/app"开头的消息都会被路由到带有@MessageMapping注解的方法中进行处理
        config.setApplicationDestinationPrefixes("/websocket/app");
        // 设置用户的目的地前缀,所有以"/user"开头的消息都会被代理发送给订阅了该目的地的用户
        config.setUserDestinationPrefix("/user");
    }
 
    /**
     * 注册消息端点
     * 可以注册多个消息端点,每个消息端点对应一个URL路径。
     *
     * @param registry 消息端点注册器
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册消息端点,参数为消息端点的URL路径(对应@MessageMapping注解的路径,也是客户端连接的路径)
        registry.addEndpoint("/websocket/bpm/runFlow")
                // 设置允许的跨域请求来源
                .setAllowedOrigins("*");
    }
 
    /**
     * 配置客户端入站通道
     *
     * @param registration 客户端入站通道注册器
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        // 设置客户端入站通道的自定义拦截器
        registration.interceptors(new MyWebSocketInterceptor());
    }
}

拦截器

package com.twqc.config.websocket;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
 
import java.util.List;
 
/**
 * WebSocket拦截器
 *
 * @author wsl
 * @date 2024/3/4
 */
@Slf4j
public class MyWebSocketInterceptor implements ChannelInterceptor {
 
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
 
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            List<String> nativeHeaders = accessor.getNativeHeader("username");
            if (nativeHeaders != null) {
                String username = nativeHeaders.get(0);
                // 存入Principal
                accessor.setUser(() -> username);
                log.info("用户:{}发起了stomp连接", username);
            } else {
                log.info("未提供用户名的stomp连接");
                return message;
            }
        }
 
        return message;
    }
}

用户的消息推送有两种实现方式

@SendToUser

@MessageMapping("/websocket/bpm/runFlow")
    @SendToUser("/websocket/topic/websocket/bpm/runFlow")
    public String runFlow2(Principal principal) {
        if (principal == null || principal.getName() == null) {
            log.error("未提供用户名的stomp连接,无法运行流程");
        }
        String username = principal.getName();
        return "success" + username;
    }

SimpMessagingTemplate

@MessageMapping("/websocket/bpm/runFlow")
    public void runFlow(FlowRequest request, Principal principal) {
        if (principal == null || principal.getName() == null) {
            log.error("未提供用户名的stomp连接,无法运行流程");
        }
        String username = principal.getName();
        flowService.runFlow(request, username);
    }

 flowService

@Autowired
    private SimpMessagingTemplate messagingTemplate;
 
    private void sendNodeResult(FlowResponse response, String username) {
        messagingTemplate.convertAndSendToUser(username, BpmConstant.Flow.TOPIC_FLOW_RESULTS, response);
    }

前端(客户端)

<template>
  <div id="app">
    <!-- 发送消息表单 -->
    <van-form @submit="onSubmit">
      <van-field
        v-model="content"
        
        label="内容"
        rows="3"
        autosize
        type="textarea"
        placeholder="请输入内容"
      />
      <div style="margin: 16px">
        <van-button round block type="info" native-type="submit"
          >提交</van-button
        >
      </div>
    </van-form>
 
    <!-- 消息回复体 -->
    <van-cell-group>
      <van-cell
        v-for="(msgItem, msgIndex) in msgList"
        :key="msgIndex"
        :title="'回复消息' + (msgIndex + 1)"
        value=""
        :label="msgItem"
      />
    </van-cell-group>
  </div>
</template>
 
<script>
import Stomp from "stompjs";
let socketTimer = null;
 
export default {
  name: "App",
  created() {
    this.username = "admin";
    this.initWebsocket();
  },
  data() {
    return {
      content: "",
      stompClient: null,
      msgList: [],
      username: "admin",
    };
  },
  methods: {
    initWebsocket() {
      this.stompClient = Stomp.client(
        "ws://192.168.1.109:7010/websocket/bpm/runFlow"
      );
      this.stompClient.debug = null;
      const headers = {
        username: this.username,
      };
      this.stompClient.connect(
        headers, // 自定义请求头
        () => {
          this.stompClient.subscribe(
            "/user/websocket/topic/websocket/bpm/runFlow",
            (res) => {
              this.msgList.push(JSON.stringify(res));
            }
          );
        },
        (err) => {
          console.log("err", err);
          this.$toast("连接失败:" + JSON.stringify(err));
          // 监听报错信息,手动发起重连
          if (socketTimer) {
            clearInterval(socketTimer);
          }
          // 10s后重新连接一次
          socketTimer = setTimeout(() => {
            this.initWebsocket();
          }, 10000);
        }
      );
      // this.stompClient.heartbeat.outgoing = 10000;
      // 若使用STOMP 1.1 版本,默认开启了心跳检测机制(默认值都是10000ms)
      // this.stompClient.heartbeat.incoming = 0;
      // 客户端不从服务端接收心跳包
    },
    closeWebsocket() {
      if (this.stompClient !== null) {
        this.stompClient.disconnect(() => {
          console.log("关闭连接");
        });
      }
    },
    onSubmit() {
      // 发送信息
      // 转成json对象
      this.stompClient.send(
        "/websocket/app/websocket/bpm/runFlow",
        {},
        // JSON.stringify({ content: this.content })
        this.content
      );
    },
  },
  destroyed() {
    // 页面销毁后记得关闭定时器
    clearInterval(socketTimer);
    this.closeWebsocket();
  },
};
</script>
 
<style>
#app {
  font-family: Avenir, Helvetica, Arial, sans-serif;
  -webkit-font-smoothing: antialiased;
  -moz-osx-font-smoothing: grayscale;
  color: #2c3e50;
}
</style>

 请注意,客户端接收信号时,需要在订阅地址前加上 / app,否则接收失败。

完整代码地址

GitHub - wangsilingwsl/vue-stomp

参考:Spring Boot + WebSocket With STOMP Tutorial | Dariawan

标签: #软件开发 1171 #JAVA 991
相关文章

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

SpringBoot整合异步任务执行 2024-10-08 11:24

同步任务: 同步任务是在单线程中按顺序执行,每次只有一个任务在执行,不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务,无法同时处理多个任务,响应慢,影响用 户体验 异步任务: 异步任务是在多线程中同时执行,多个任务可以并发执行,同时处理多个请求,响应快,资源

springboot kafka多数据源,通过配置动态加载发送者和消费者 2024-10-08 11:24

前言 最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。 准备test kafka本身非常容易上手,如果我们需要单元测试,引入ja

SpringBoot 集成 Redis 2024-10-08 11:24

一:SpringBoot 集成 Redis ①Redis是一个 NoSQL(not only)数据库, 常作用缓存 Cache 使用。 ②Redis是一个中间件、是一个独立的服务器;常用的数据类型: string , hash ,set ,zset , list ③通过Redis客户端可以使用多种语

SpringBoot整合QQ邮箱 2024-10-08 11:24

SpringBoot可以通过导入依赖的方式集成多种技术,这当然少不了我们常用的邮箱,现在本章演示SpringBoot整合QQ邮箱发送邮件…. 下面按步骤进行: 1.获取QQ邮箱授权码 1.1 登录QQ邮箱 1.2 开启SMTP服务 找到下图中的SMTP服务区域,如果当前账号未开启的话自己手动开启。

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.