锋盈数科-知识库 Logo
首页
软件开发
计算机基础
Hello Halo
新手必读
关于本知识库
登录 →
锋盈数科-知识库 Logo
首页 软件开发 计算机基础 Hello Halo 新手必读 关于本知识库
登录
  1. 首页
  2. 软件开发
  3. JAVA
  4. SpringBoot+WebSocket 消息推送 校验 心跳机制 PING-PONG 用户分组等

SpringBoot+WebSocket 消息推送 校验 心跳机制 PING-PONG 用户分组等

0
  • JAVA
  • 发布于 2024-08-15
  • 0 次阅读
黄健
黄健

本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net

前言:

        WebSocket PING-PONG 心跳机制,只需要服务端发送 PING,客户端会自动回应 PONG,本文中使用了两个 @OnMassage 注解一个用于接收 Text 消息,一个用于接收 PONG 响应消息,此外还有二进制格式(InputStream ,byte[],ByteBuffer 等)。

说明:      

        记录一下,自己使用的 WebSocket 方式。

        性能可能不是最优,也有可能有其他隐患。

        (作者逻辑可能也点问题,有大佬发现问题还请不用口下留情!)

一、引入依赖

 还有 Lombok 等自行导入

<!-- websocket -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

二、创建 WebSocket 配置类

@Configuration
public class WebSocketConfig {
 
    /**
     * ServerEndpointExporter类的作用是,会扫描所有的服务器端点,
     * 把带有@ServerEndpoint 注解的所有类都添加进来
     *
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
 
}

三、创建 WebSocket 服务类

这类 token 并没有做过期相关的处理,可以根据个人需求添加 

/**
 * webSocket服务
 */
@Slf4j
@Component
@ServerEndpoint("/wsserver/{groupId}/{userId}")
public class WebSocketServer {
 
    private static WebSocketGroupManager groupManager;
    private ScheduledExecutorService executor= Executors.newSingleThreadScheduledExecutor();
    @Autowired
    public void setWebSocketGroupManager(WebSocketGroupManager manager) {
        groupManager = manager;
    }
 
    @OnOpen
    public void onOpen(Session session, @PathParam("groupId") String groupId, @PathParam("userId") String userId) {
        try {
            //包含token校验
            String queryString = session.getQueryString();
            Map<String, List<String>> queryParams = decodeQueryString(queryString);
            // 获取token参数
            List<String> tokenValues = queryParams.get("token");
            if (tokenValues != null && !tokenValues.isEmpty()) {
                //根据自己的param顺序,实现不同的业务逻辑
                String token = tokenValues.get(0);
                // 进行校验操作
                if (isValidToken(token)) {
                    WebSocketGroup group = groupManager.getOrCreateGroup(groupId);
                    WebSocketUser user = new WebSocketUser(session, userId,token);
                    group.addUser(user);
                    // 更新在线用户计数器
                    int count = groupManager.increaseOnlineCount();
 
                    session.getAsyncRemote().sendPing(ByteBuffer.wrap(new byte[0]));
 
                    log.info("校验通过!用户:{} 上线,当前在线人数:{},分组:{},分组在线人数:{}", userId, count, groupId,group != null ? group.getUserCount() : 0);
                    // 校验通过,进行其他操作
                } else {
                    // 校验失败,关闭连接
                    closeSession(session,groupId,userId);
                    log.warn("校验失败!用户:{} token 错误!",userId);
                }
            } else {
                // 没有提供token参数,直接过滤,关闭连接
                closeSession(session,groupId,userId);
            }
        } catch (Exception e) {
            log.error("用户:{} ,连接时发送异常!异常信息:{}", userId, e.getMessage());
            closeSession(session, groupId, userId);
        }
    }
 
    // 接收普通消息
    @OnMessage
    public void onMessage(String message, Session session, @PathParam("groupId") String groupId, @PathParam("userId") String userId) {
        WebSocketGroup group = groupManager.getGroup(groupId);
        if (group != null) {
            group.sendMessageToAllUsers(message);
        }
    }
 
    // 接收心跳消息
    @OnMessage
    public void onPong(PongMessage pong, Session session, @PathParam("groupId") String groupId, @PathParam("userId") String userId) {
        executor.schedule(() -> {
            try {
                // 发送空的Ping消息
                session.getAsyncRemote().sendPing(ByteBuffer.wrap(new byte[0]));
            } catch (IOException e) {
                // 处理发送失败的情况
                log.error("Ping 用户:{} 心跳异常,关闭会话,错误原因:{}", userId, e.getMessage());
                closeSession(session, groupId, userId);
            }
        }, 30, TimeUnit.SECONDS);
    }
 
    @OnClose
    public void onClose(@PathParam("groupId") String groupId, @PathParam("userId") String userId, Session session) {
        try {
            WebSocketGroup group = groupManager.getGroup(groupId);
            if (group != null) {
                group.removeUser(userId);
                // 检查分组的用户数量,如果为0,则从分组管理器中删除分组对象
                if (group.getUserCount() == 0) {
                    groupManager.removeGroup(groupId);
                }
            }
            // 更新在线用户计数器
            int count = groupManager.decreaseOnlineCount();
 
            log.info("用户:{} 退出,当前在线人数:{},分组:{},分组在线人数:{}", userId, count, groupId, group != null ? group.getUserCount() : 0);
        } catch (Exception e) {
            log.error("连接关闭时异常!用户:{},分组:{},错误原因:{}", userId, groupId, e.getMessage());
            closeSession(session, groupId, userId);
        }
    }
 
    @OnError
    public void onError(Throwable throwable, @PathParam("groupId") String groupId, @PathParam("userId") String userId, Session session) {
        // 向客户端发送错误消息
        session.getAsyncRemote().sendText("发生了错误,请稍后再试。");
        log.error("连接异常!用户:{},分组:{},错误原因:{}", userId, groupId, throwable.getMessage());
        closeSession(session, groupId, userId);
    }
 
    /**
     * 关闭Session
     *
     * @param session
     */
    private void closeSession(Session session, String groupId, String userId) {
        // 关闭连接
        if (session != null && session.isOpen()) {
            //关闭后删除掉对应用户信息
            WebSocketGroup group = groupManager.getGroup(groupId);
            if (group != null) {
                group.removeUser(userId);
                // 检查分组的用户数量,如果为0,则从分组管理器中删除分组对象
                if (group.getUserCount() == 0) {
                    groupManager.removeGroup(groupId);
                }
            }
            // 更新在线用户计数器
            int count = groupManager.decreaseOnlineCount();
            try {
                session.close();
            } catch (IOException e) {
                log.error("关闭session会话时异常:{}", e.getMessage());
            }
        }
    }
 
    /**
     * 获取全部在线用户数量统计
     *
     * @return
     */
    public static int getOnlineCount() {
        return groupManager.getOnlineCount();
    }
 
    /**
     * 向所有分组的子目录下发命令。
     *
     * @param message
     * @warn 由服务器统一下发,若使用多线程,存在线程安全问题。
     */
    public static void sendMessageToAllGroups(String message) {
        groupManager.sendMessageToAllGroups(message);
    }
 
    // 校验token的方法
    private boolean isValidToken(String token) {
        // 根据自己的需求,进行校验逻辑,返回校验结果
        return true;
    }
 
    // 解码查询参数
    private Map<String, List<String>> decodeQueryString(String queryString) {
        // 根据自己的需求实现解码逻辑
        //这里做简单的解析参数。
        Map<String, List<String>> queryParams = new HashMap<>();
        String[] pairs = queryString.split("&");
        for (String pair : pairs) {
            String[] parts = pair.split("=");
            String name = parts[0];
            String value = "";
            if (parts.length > 1) {
                value = parts[1];
            }
            queryParams.computeIfAbsent(name, k -> new ArrayList<>()).add(value);
        }
        return queryParams;
    }
}

四、创建 WebSocket 分组以及分组管理器

 分组管理器

@Slf4j
@Component
public class WebSocketGroupManager {
    private final Map<String, WebSocketGroup> groups;
 
    private AtomicInteger onlineCount;
 
    public WebSocketGroupManager() {
        this.groups = new ConcurrentHashMap<>();
        this.onlineCount = new AtomicInteger(0);
    }
 
    public void addGroup(WebSocketGroup group) {
        groups.put(group.getGroupId(), group);
    }
 
    public void removeGroup(String groupId) {
        groups.remove(groupId);
    }
 
    public WebSocketGroup getGroup(String groupId) {
        return groups.get(groupId);
    }
 
    public WebSocketGroup getOrCreateGroup(String groupId) {
        WebSocketGroup group = groups.get(groupId);
        if (group == null) {
            group = new WebSocketGroup(groupId);
            groups.put(groupId, group);
        }
        return group;
    }
 
    public void sendMessageToAllGroups(String message) {
        for (WebSocketGroup group : groups.values()) {
            group.sendMessageToAllUsers(message);
        }
    }
 
    public int getGroupUserCount(String groupId) {
        WebSocketGroup group = groups.get(groupId);
        if (group != null) {
            return group.getUserCount();
        }
        return 0;
    }
    public int getOnlineCount() {
        return onlineCount.get();
    }
 
    public int increaseOnlineCount() {
        return onlineCount.incrementAndGet();
    }
 
    public int decreaseOnlineCount() {
        return onlineCount.decrementAndGet();
    }
 
}

 分组

@Slf4j
public class WebSocketGroup {
    private String groupId;
    private Map<String, WebSocketUser> users;
    private int userCount;
 
    public WebSocketGroup(String groupId) {
        this.groupId = groupId;
        this.users =new HashMap<>();
        this.userCount = 0;
    }
 
    public void addUser(WebSocketUser user) {
        users.put(user.getUserId(), user);
 
        // 更新在线用户计数器
        userCount++;
    }
 
    public WebSocketUser getUser(String userId) {
        return users.get(userId);
    }
 
    public void removeUser(String userId) {
        if (users.containsKey(userId)) {
            WebSocketUser removedUser = users.remove(userId);
 
            // 更新在线用户计数器
            if (removedUser != null) {
                userCount--;
            }
        } else {
            // 用户不存在
 
        }
    }
 
    public int getUserCount() {
        return userCount;
    }
 
    /**
     * 向当前分组下所有用户发送信息
     * @param message
     */
    public void sendMessageToAllUsers(String message) {
        for (WebSocketUser user : users.values()) {
            user.sendMessage(message);
        }
    }
 
    public String getGroupId() {
        return groupId;
    }
}

 用户

@Slf4j
public class WebSocketUser {
    private Session session;
    private String userId;
 
    private String token;
 
    public WebSocketUser(Session session, String userId,String token) {
        this.session = session;
        this.userId = userId;
        this.token=token;
    }
    public WebSocketUser(Session session, String userId) {
        this.session = session;
        this.userId = userId;
    }
 
    public Session getSession() {
        return session;
    }
 
    public String getUserId() {
        return userId;
    }
 
    public void sendMessage(String message) {
        try {
            session.getAsyncRemote().sendText(message);
        } catch (Exception e) {
            log.error("发送消息异常!用户:{},错误原因:{}", userId, e.getMessage());
        }
    }
}
标签: #软件开发 1171 #JAVA 991
相关文章

Spring 实现 3 种异步接口 2024-10-18 09:07

大家好,我是苏三~ 如何处理比较耗时的接口? 这题我熟,直接上异步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可实现。 但这些方法有局限性,处理结果仅返回单个值。在某些场景下,如果需要接口异步处理的同时,还持续不断地

重学SpringBoot3-集成Redis(五)之布隆过滤器 2024-10-08 11:24

更多SpringBoot3内容请关注我的专栏:《SpringBoot3》 期待您的点赞👍收藏⭐评论✍ 重学SpringBoot3-集成Redis(五)之布隆过滤器 1. 什么是布隆过滤器? * 基本概念 适用场景 2. 使用 Redis 实现布隆过滤器 * 项目依赖 Redis 配置

SpringBoot整合异步任务执行 2024-10-08 11:24

同步任务: 同步任务是在单线程中按顺序执行,每次只有一个任务在执行,不会引发线程安全和数据一致性等 并发问题 同步任务需要等待任务执行完成后才能执行下一个任务,无法同时处理多个任务,响应慢,影响用 户体验 异步任务: 异步任务是在多线程中同时执行,多个任务可以并发执行,同时处理多个请求,响应快,资源

springboot kafka多数据源,通过配置动态加载发送者和消费者 2024-10-08 11:24

前言 最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。 准备test kafka本身非常容易上手,如果我们需要单元测试,引入ja

SpringBoot 集成 Redis 2024-10-08 11:24

一:SpringBoot 集成 Redis ①Redis是一个 NoSQL(not only)数据库, 常作用缓存 Cache 使用。 ②Redis是一个中间件、是一个独立的服务器;常用的数据类型: string , hash ,set ,zset , list ③通过Redis客户端可以使用多种语

SpringBoot整合QQ邮箱 2024-10-08 11:24

SpringBoot可以通过导入依赖的方式集成多种技术,这当然少不了我们常用的邮箱,现在本章演示SpringBoot整合QQ邮箱发送邮件…. 下面按步骤进行: 1.获取QQ邮箱授权码 1.1 登录QQ邮箱 1.2 开启SMTP服务 找到下图中的SMTP服务区域,如果当前账号未开启的话自己手动开启。

目录

IT 外包服务商

  • 意见投递
  • zyf6619

软件开发应用

主菜单

  • 首页
  • 软件开发
  • 计算机基础
  • Hello Halo
  • 新手必读
  • 关于本知识库
Copyright © 2024 your company All Rights Reserved. Powered by Halo.