package com.gyee.alarm.websocket; import com.gyee.alarm.util.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; /** * 功能描述: * WebSocketServer服务端 * @Date: 2022-12-01 09:41:32 * @since: 1.0.0 */ // @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端。注解的值将被用于监听用户连接的终端访问URL地址 // encoders = WebSocketCustomEncoding.class 是为了使用ws自己的推送Object消息对象(sendObject())时进行解码,通过Encoder 自定义规则(转换为JSON字符串) @ServerEndpoint(value = "/websocket/{userId}",encoders = WebSocketCustomEncoding.class) @Component public class WebSocket { private final static Logger logger = LogManager.getLogger(WebSocket.class); /** * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的 */ private static int onlineCount = 0; /** * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象 */ public static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); /*** * 功能描述: * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象的参数体 */ public static ConcurrentHashMap webSocketParamsMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; private String userId; /** * 连接建立成功调用的方法 * onOpen 和 onClose 方法分别被@OnOpen和@OnClose 所注解。他们定义了当一个新用户连接和断开的时候所调用的方法。 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; //加入map webSocketMap.put(userId, this); addOnlineCount(); //在线数加1 logger.info("用户{}连接成功,当前在线人数为{}", userId, getOnlineCount()); try { sendMessage(String.valueOf(this.session.getQueryString())); } catch (IOException e) { logger.error("IO异常"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { //从map中删除 webSocketMap.remove(userId); subOnlineCount(); //在线数减1 logger.info("用户{}关闭连接!当前在线人数为{}", userId, getOnlineCount()); } /** * 收到客户端消息后调用的方法 * onMessage 方法被@OnMessage所注解。这个注解定义了当服务器接收到客户端发送的消息时所调用的方法。 * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) throws IOException { logger.info("来自客户端用户:{} 消息:{}",userId, message); sendMessageByUserId(userId,"ok"); //群发消息 /*for (String item : webSocketMap.keySet()) { try { webSocketMap.get(item).sendMessage(message); } catch (IOException e) { e.printStackTrace(); } }*/ } /** * 发生错误时调用 * * @OnError */ @OnError public void onError(Session session, Throwable error) { logger.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); error.printStackTrace(); } /** * 向客户端发送消息 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); //this.session.getAsyncRemote().sendText(message); } /** * 向客户端发送消息 */ public void sendMessage(Object message) throws IOException, EncodeException { if(StringUtils.notEmp(this.session)) { synchronized (this.session){ if(this.session.isOpen()) { this.session.getBasicRemote().sendObject(message); } } } //this.session.getAsyncRemote().sendText(message); } /** * 通过userId向客户端发送消息 */ public void sendMessageByUserId(String userId, String message) throws IOException { logger.info("服务端发送消息到{},消息:{}",userId,message); if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){ synchronized ( webSocketMap.get(userId)) { webSocketMap.get(userId).sendMessage(message); } }else{ logger.error("用户{}不在线",userId); } } /** * 通过userId向客户端发送消息 */ public void sendMessageByUserId(String userId, Object message) throws IOException, EncodeException { logger.info("服务端发送消息到{},消息:{}",userId,message); if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){ webSocketMap.get(userId).sendMessage(message); }else{ logger.error("用户{}不在线",userId); } } /** * 通过userId更新缓存的参数 */ public void changeParamsByUserId(String userId, PushParams pushParams) throws IOException, EncodeException { logger.info("ws用户{}请求参数更新,参数:{}",userId,pushParams.toString()); webSocketParamsMap.put(userId,pushParams); } /** * 群发自定义消息 */ public static void sendInfo(String message) throws IOException { for (String item : webSocketMap.keySet()) { try { webSocketMap.get(item).sendMessage(message); } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocket.onlineCount++; } public static synchronized void subOnlineCount() { WebSocket.onlineCount--; } }