本文由 简悦 SimpRead 转码, 原文地址 blog.csdn.net
引入依赖
Spring Boot 中的 WebSocket 依赖于 Spring WebFlux 模块,使用了 Reactor Netty 库来实现底层的 WebSocket 通信。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
服务端配置
/**
* WebSocket配置类
*/
@Configuration
public class WebSocketConfig {
/**
* 注入ServerEndpointExporter的bean对象,自动注册使用了@ServerEndpoint注解的bean
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
ServerEndpointExporter
在 Spring 框架中,ServerEndpointExporter的注入是 WebSocket 配置的重要部分。
为什么要注入ServerEndpointExporter对象?
-
自动注册 WebSocket 端点:
ServerEndpointExporter会扫描 Spring 应用上下文中使用了@ServerEndpoint注解的类,并自动注册这些类为 WebSocket 端点。这样,就不需要手动注册每个端点,简化了 WebSocket 端点的配置过程。 -
集成 Spring 框架: 通过
ServerEndpointExporter,Spring 框架能够更好地管理和配置 WebSocket 端点。例如,Spring 的依赖注入功能可以用于 WebSocket 端点,使得端点可以依赖 Spring 管理的 bean。
如果实现自动注册端点 (函数 registerEndpoints)?
请看下述注释:
- 创建一个 linkedhashset 集合来存储端点类.
- 从 spring 上下文中查找所有带有 serverEndPoint 的 bean 并添加到 endPointClasses 集合中
- 遍历集合, 注册每一个端点类
- 从 spring 上下文中查找所有带有 serverEndPointConfig 的 bean, 放置在集合中并注册
protected void registerEndpoints() {
// 创建一个LinkedHashSet来存储端点类,确保顺序和唯一性
Set<Class<?>> endpointClasses = new LinkedHashSet<>();
// 如果annotatedEndpointClasses不为空,将其全部添加到endpointClasses中
if (this.annotatedEndpointClasses != null) {
endpointClasses.addAll(this.annotatedEndpointClasses);
}
// 获取应用上下文
ApplicationContext context = this.getApplicationContext();
// 如果上下文不为空,从上下文中获取所有带有ServerEndpoint注解的bean的名字
if (context != null) {
String[] endpointBeanNames = context.getBeanNamesForAnnotation(ServerEndpoint.class);
// 遍历所有bean名字
for (String beanName : endpointBeanNames) {
// 将bean对应的类型添加到endpointClasses中
endpointClasses.add(context.getType(beanName));
}
}
// 遍历所有收集到的端点类
for (Class<?> endpointClass : endpointClasses) {
// 注册每个端点类
this.registerEndpoint(endpointClass);
}
// 如果上下文不为空,从上下文中获取所有ServerEndpointConfig类型的bean
if (context != null) {
Map<String, ServerEndpointConfig> endpointConfigMap = context.getBeansOfType(ServerEndpointConfig.class);
// 遍历所有ServerEndpointConfig类型的bean
for (ServerEndpointConfig endpointConfig : endpointConfigMap.values()) {
// 注册每个ServerEndpointConfig
this.registerEndpoint(endpointConfig);
}
}
}
创建 websocket 对象
/**
* 服务端WebSocket对象
*/
@ServerEndpoint(value = "/chat/{param}")
@Component
public class ChatEndpoint {
//用来存储每一个客户端对象对应的ChatEndpoint对象 ConcurrentHashMap效率高,线程安全,但是key和value都不能为null
// public static Map<String, ChatEndpoint> onlineUsers = new ConcurrentHashMap<>();
public static CopyOnWriteArraySet<ChatEndpoint> webSocketCopyOnWriteArraySet = new CopyOnWriteArraySet<>();
private String loginId;
//声明Session对象,通过该对象可以发送消息给指定的用户
private Session session;
/**
* 连接建立时被调用
* @param session
* @param param
*/
@OnOpen
public void onOpen(Session session, @PathParam("param")String param){
//将局部的session对象赋值给成员session
this.session = session;
this.loginId = param; //1_1 0是企业,1是求职者
//将当前登录用户存储到容器中
webSocketCopyOnWriteArraySet.add(this);
}
/**
* 接收到客户端发来的消息时被调用
* @param message
*/
@OnMessage
public void onMessage(String message){
try{
JSONObject messageObject = JSON.parseObject(message);
Integer userId = messageObject.getInteger("userId");
Integer enterpriseId = messageObject.getInteger("enterpriseId");
Integer status = messageObject.getInteger("status");
Integer chatId = messageObject.getInteger("chatId");
String receiver ="";
if(status==0){
receiver ="USER_"+userId;
}else{
receiver ="ENTERPRISE_"+enterpriseId;
}
// 单聊
for(ChatEndpoint chatEndpoint : webSocketCopyOnWriteArraySet){
if(chatEndpoint.loginId.equals(receiver)){
chatEndpoint.session.getBasicRemote().sendText(JSONObject.toJSONString(messageObject));
}
}
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 发送系统消息
* @param receiverId
*/
public void sendSystemMessage(String receiverId) {
try{
JSONObject messageObject = new JSONObject();
for(ChatEndpoint chatEndpoint : webSocketCopyOnWriteArraySet){
if(chatEndpoint.loginId.equals(receiverId)){
chatEndpoint.session.getBasicRemote().sendText(JSONObject.toJSONString(messageObject));
}
}
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 连接关闭时被调用
*/
@OnClose
public void onClose(){
webSocketCopyOnWriteArraySet.remove(this);
}
}
ServerEndpoint 注解
@ServerEndpoint是一个类层次的注解,它的功能是标识当前类是一个 WebSocket 服务端,注解的值将被用于监听用户连接的终端访问 URL 地址,客户端可以通过这个 URL 来连接到 WebSocket 服务器端。
生命周期回调方法
一个用 @ServerEndpoint 注解标记的类,可以定义以下几种方法来处理不同的 WebSocket 事件:
@OnOpen:在客户端打开连接时调用。@OnMessage:在接收到客户端消息时调用。@OnClose:在连接关闭时调用。@OnError:在通信过程中发生错误时调用。
session 对象
session 对象是 Java WebSocket API 中 javax.websocket.Session 类的一个实例。它表示服务器和特定客户端之间的 WebSocket 连接。通过 session 对象,可以执行各种操作,比如发送消息(getBasicRemote().sendText())给客户端、关闭连接以及访问 WebSocket 连接的属性。
CopyOnWriteArraySet & ConcurrentHashMap
CopyOnWriteArraySet 是 Java 中一个线程安全的 Set 实现,它基于 CopyOnWriteArrayList,实现了 Set 接口。
线程安全的原因
-
写时复制机制:
当执行修改操作(如添加或删除元素)时,
CopyOnWriteArraySet会复制底层的数组,创建一个新的数组并在其上进行修改操作。修改完成后,将新的数组引用替换旧的数组引用。这种机制确保了在修改操作进行期间,原有的数组不会被修改,所有正在读取操作的线程仍然可以安全地访问旧的数组。 -
读写分离:
读操作直接读取底层数组,不需要加锁,因此读操作非常高效。写操作因为会复制数组,代价较高,但因为写操作相对读操作较少,这种设计在大多数应用场景中是可以接受的。
使用场景
CopyOnWriteArraySet 适用于以下场景:
- 多线程环境中读操作远多于写操作。
- 不需要强一致性的快速读取操作。
- 需要线程安全但不希望在读操作上有任何性能损耗。