WebSocket.java 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package com.gyee.alarm.websocket;
  2. import com.gyee.alarm.util.StringUtils;
  3. import org.apache.logging.log4j.LogManager;
  4. import org.apache.logging.log4j.Logger;
  5. import org.springframework.stereotype.Component;
  6. import javax.websocket.*;
  7. import javax.websocket.server.PathParam;
  8. import javax.websocket.server.ServerEndpoint;
  9. import java.io.IOException;
  10. import java.util.concurrent.ConcurrentHashMap;
  11. /**
  12. * 功能描述:
  13. * WebSocketServer服务端
  14. * @Date: 2022-12-01 09:41:32
  15. * @since: 1.0.0
  16. */
  17. // @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端。注解的值将被用于监听用户连接的终端访问URL地址
  18. // encoders = WebSocketCustomEncoding.class 是为了使用ws自己的推送Object消息对象(sendObject())时进行解码,通过Encoder 自定义规则(转换为JSON字符串)
  19. @ServerEndpoint(value = "/websocket/{userId}",encoders = WebSocketCustomEncoding.class)
  20. @Component
  21. public class WebSocket {
  22. private final static Logger logger = LogManager.getLogger(WebSocket.class);
  23. /**
  24. * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的
  25. */
  26. private static int onlineCount = 0;
  27. /**
  28. * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象
  29. */
  30. public static ConcurrentHashMap<String, WebSocket> webSocketMap = new ConcurrentHashMap<>();
  31. /***
  32. * 功能描述:
  33. * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象的参数体
  34. */
  35. public static ConcurrentHashMap<String, PushParams> webSocketParamsMap = new ConcurrentHashMap<>();
  36. /**
  37. * 与某个客户端的连接会话,需要通过它来给客户端发送数据
  38. */
  39. private Session session;
  40. private String userId;
  41. /**
  42. * 连接建立成功调用的方法
  43. * onOpen 和 onClose 方法分别被@OnOpen和@OnClose 所注解。他们定义了当一个新用户连接和断开的时候所调用的方法。
  44. */
  45. @OnOpen
  46. public void onOpen(Session session, @PathParam("userId") String userId) {
  47. this.session = session;
  48. this.userId = userId;
  49. //加入map
  50. webSocketMap.put(userId, this);
  51. addOnlineCount(); //在线数加1
  52. logger.info("用户{}连接成功,当前在线人数为{}", userId, getOnlineCount());
  53. try {
  54. sendMessage(String.valueOf(this.session.getQueryString()));
  55. } catch (IOException e) {
  56. logger.error("IO异常");
  57. }
  58. }
  59. /**
  60. * 连接关闭调用的方法
  61. */
  62. @OnClose
  63. public void onClose() {
  64. //从map中删除
  65. webSocketMap.remove(userId);
  66. subOnlineCount(); //在线数减1
  67. logger.info("用户{}关闭连接!当前在线人数为{}", userId, getOnlineCount());
  68. }
  69. /**
  70. * 收到客户端消息后调用的方法
  71. * onMessage 方法被@OnMessage所注解。这个注解定义了当服务器接收到客户端发送的消息时所调用的方法。
  72. * @param message 客户端发送过来的消息
  73. */
  74. @OnMessage
  75. public void onMessage(String message, Session session) throws IOException {
  76. logger.info("来自客户端用户:{} 消息:{}",userId, message);
  77. sendMessageByUserId(userId,"ok");
  78. //群发消息
  79. /*for (String item : webSocketMap.keySet()) {
  80. try {
  81. webSocketMap.get(item).sendMessage(message);
  82. } catch (IOException e) {
  83. e.printStackTrace();
  84. }
  85. }*/
  86. }
  87. /**
  88. * 发生错误时调用
  89. *
  90. * @OnError
  91. */
  92. @OnError
  93. public void onError(Session session, Throwable error) {
  94. logger.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
  95. error.printStackTrace();
  96. }
  97. /**
  98. * 向客户端发送消息
  99. */
  100. public void sendMessage(String message) throws IOException {
  101. this.session.getBasicRemote().sendText(message);
  102. //this.session.getAsyncRemote().sendText(message);
  103. }
  104. /**
  105. * 向客户端发送消息
  106. */
  107. public void sendMessage(Object message) throws IOException, EncodeException {
  108. if(StringUtils.notEmp(this.session))
  109. {
  110. synchronized (this.session){
  111. if(this.session.isOpen())
  112. {
  113. this.session.getBasicRemote().sendObject(message);
  114. }
  115. }
  116. }
  117. //this.session.getAsyncRemote().sendText(message);
  118. }
  119. /**
  120. * 通过userId向客户端发送消息
  121. */
  122. public void sendMessageByUserId(String userId, String message) throws IOException {
  123. logger.info("服务端发送消息到{},消息:{}",userId,message);
  124. if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
  125. synchronized ( webSocketMap.get(userId)) {
  126. webSocketMap.get(userId).sendMessage(message);
  127. }
  128. }else{
  129. logger.error("用户{}不在线",userId);
  130. }
  131. }
  132. /**
  133. * 通过userId向客户端发送消息
  134. */
  135. public void sendMessageByUserId(String userId, Object message) throws IOException, EncodeException {
  136. logger.info("服务端发送消息到{},消息:{}",userId,message);
  137. if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
  138. webSocketMap.get(userId).sendMessage(message);
  139. }else{
  140. logger.error("用户{}不在线",userId);
  141. }
  142. }
  143. /**
  144. * 通过userId更新缓存的参数
  145. */
  146. public void changeParamsByUserId(String userId, PushParams pushParams) throws IOException, EncodeException {
  147. logger.info("ws用户{}请求参数更新,参数:{}",userId,pushParams.toString());
  148. webSocketParamsMap.put(userId,pushParams);
  149. }
  150. /**
  151. * 群发自定义消息
  152. */
  153. public static void sendInfo(String message) throws IOException {
  154. for (String item : webSocketMap.keySet()) {
  155. try {
  156. webSocketMap.get(item).sendMessage(message);
  157. } catch (IOException e) {
  158. continue;
  159. }
  160. }
  161. }
  162. public static synchronized int getOnlineCount() {
  163. return onlineCount;
  164. }
  165. public static synchronized void addOnlineCount() {
  166. WebSocket.onlineCount++;
  167. }
  168. public static synchronized void subOnlineCount() {
  169. WebSocket.onlineCount--;
  170. }
  171. }