123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- package com.gyee.alarm.websocket;
- import com.gyee.alarm.util.StringUtils;
- import org.apache.logging.log4j.LogManager;
- import org.apache.logging.log4j.Logger;
- import org.springframework.stereotype.Component;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.util.concurrent.ConcurrentHashMap;
- /**
- * 功能描述:
- * WebSocketServer服务端
- * @Date: 2022-12-01 09:41:32
- * @since: 1.0.0
- */
- // @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端。注解的值将被用于监听用户连接的终端访问URL地址
- // encoders = WebSocketCustomEncoding.class 是为了使用ws自己的推送Object消息对象(sendObject())时进行解码,通过Encoder 自定义规则(转换为JSON字符串)
- @ServerEndpoint(value = "/websocket/{userId}",encoders = WebSocketCustomEncoding.class)
- @Component
- public class WebSocket {
- private final static Logger logger = LogManager.getLogger(WebSocket.class);
- /**
- * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的
- */
- private static int onlineCount = 0;
- /**
- * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象
- */
- public static ConcurrentHashMap<String, WebSocket> webSocketMap = new ConcurrentHashMap<>();
- /***
- * 功能描述:
- * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象的参数体
- */
- public static ConcurrentHashMap<String, PushParams> webSocketParamsMap = new ConcurrentHashMap<>();
- /**
- * 与某个客户端的连接会话,需要通过它来给客户端发送数据
- */
- private Session session;
- private String userId;
- /**
- * 连接建立成功调用的方法
- * onOpen 和 onClose 方法分别被@OnOpen和@OnClose 所注解。他们定义了当一个新用户连接和断开的时候所调用的方法。
- */
- @OnOpen
- public void onOpen(Session session, @PathParam("userId") String userId) {
- this.session = session;
- this.userId = userId;
- //加入map
- webSocketMap.put(userId, this);
- addOnlineCount(); //在线数加1
- logger.info("用户{}连接成功,当前在线人数为{}", userId, getOnlineCount());
- try {
- sendMessage(String.valueOf(this.session.getQueryString()));
- } catch (IOException e) {
- logger.error("IO异常");
- }
- }
- /**
- * 连接关闭调用的方法
- */
- @OnClose
- public void onClose() {
- //从map中删除
- webSocketMap.remove(userId);
- subOnlineCount(); //在线数减1
- logger.info("用户{}关闭连接!当前在线人数为{}", userId, getOnlineCount());
- }
- /**
- * 收到客户端消息后调用的方法
- * onMessage 方法被@OnMessage所注解。这个注解定义了当服务器接收到客户端发送的消息时所调用的方法。
- * @param message 客户端发送过来的消息
- */
- @OnMessage
- public void onMessage(String message, Session session) throws IOException {
- logger.info("来自客户端用户:{} 消息:{}",userId, message);
- sendMessageByUserId(userId,"ok");
- //群发消息
- /*for (String item : webSocketMap.keySet()) {
- try {
- webSocketMap.get(item).sendMessage(message);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }*/
- }
- /**
- * 发生错误时调用
- *
- * @OnError
- */
- @OnError
- public void onError(Session session, Throwable error) {
- logger.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
- error.printStackTrace();
- }
- /**
- * 向客户端发送消息
- */
- public void sendMessage(String message) throws IOException {
- this.session.getBasicRemote().sendText(message);
- //this.session.getAsyncRemote().sendText(message);
- }
- /**
- * 向客户端发送消息
- */
- public void sendMessage(Object message) throws IOException, EncodeException {
- if(StringUtils.notEmp(this.session))
- {
- synchronized (this.session){
- if(this.session.isOpen())
- {
- this.session.getBasicRemote().sendObject(message);
- }
- }
- }
- //this.session.getAsyncRemote().sendText(message);
- }
- /**
- * 通过userId向客户端发送消息
- */
- public void sendMessageByUserId(String userId, String message) throws IOException {
- logger.info("服务端发送消息到{},消息:{}",userId,message);
- if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
- synchronized ( webSocketMap.get(userId)) {
- webSocketMap.get(userId).sendMessage(message);
- }
- }else{
- logger.error("用户{}不在线",userId);
- }
- }
- /**
- * 通过userId向客户端发送消息
- */
- public void sendMessageByUserId(String userId, Object message) throws IOException, EncodeException {
- logger.info("服务端发送消息到{},消息:{}",userId,message);
- if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
- webSocketMap.get(userId).sendMessage(message);
- }else{
- logger.error("用户{}不在线",userId);
- }
- }
- /**
- * 通过userId更新缓存的参数
- */
- public void changeParamsByUserId(String userId, PushParams pushParams) throws IOException, EncodeException {
- logger.info("ws用户{}请求参数更新,参数:{}",userId,pushParams.toString());
- webSocketParamsMap.put(userId,pushParams);
- }
- /**
- * 群发自定义消息
- */
- public static void sendInfo(String message) throws IOException {
- for (String item : webSocketMap.keySet()) {
- try {
- webSocketMap.get(item).sendMessage(message);
- } catch (IOException e) {
- continue;
- }
- }
- }
- public static synchronized int getOnlineCount() {
- return onlineCount;
- }
- public static synchronized void addOnlineCount() {
- WebSocket.onlineCount++;
- }
- public static synchronized void subOnlineCount() {
- WebSocket.onlineCount--;
- }
- }
|