|
@@ -1,20 +1,22 @@
|
|
|
package com.gyee.frame.netty.websocket;
|
|
|
|
|
|
+import cn.hutool.log.Log;
|
|
|
+import cn.hutool.log.LogFactory;
|
|
|
+import com.gyee.frame.common.quartz.SocketTaskUtil;
|
|
|
+import com.gyee.frame.util.StringUtils;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.websocket.*;
|
|
|
+import javax.websocket.server.PathParam;
|
|
|
+import javax.websocket.server.ServerEndpoint;
|
|
|
import java.io.IOException;
|
|
|
import java.net.BindException;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.CopyOnWriteArraySet;
|
|
|
|
|
|
-import javax.websocket.*;
|
|
|
-import javax.websocket.server.PathParam;
|
|
|
-import javax.websocket.server.ServerEndpoint;
|
|
|
-
|
|
|
-import com.gyee.frame.common.quartz.SocketTaskUtil;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-import cn.hutool.log.Log;
|
|
|
-import cn.hutool.log.LogFactory;
|
|
|
-
|
|
|
/**
|
|
|
*@author:Wang Jiawen
|
|
|
*
|
|
@@ -24,17 +26,22 @@ import cn.hutool.log.LogFactory;
|
|
|
*
|
|
|
*/
|
|
|
|
|
|
-@ServerEndpoint(value = "/websocket/{pageNumber}/{functionNumber}",encoders = { ServerEncoder.class })
|
|
|
+@ServerEndpoint(value = "/websocket/{pageNumber}/{functionNumber}/{keyid}",encoders = { ServerEncoder.class })
|
|
|
@Component
|
|
|
public class WebSocketServer extends SocketTaskUtil {
|
|
|
|
|
|
static Log log=LogFactory.get(WebSocketServer.class);
|
|
|
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
|
|
|
private static int onlineCount = 0;
|
|
|
- //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象,准备弃用
|
|
|
- private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
|
|
|
- //功能名称与页面名称为key 举例 页面名称_功能名称
|
|
|
+ //concurrent包的线程安全Set,功能名称与页面名称+对象编号(风场、项目、线路、风机)为key
|
|
|
+ private static Map<String,CopyOnWriteArraySet<WebSocketServer>> webSocketMapSet = new HashMap();
|
|
|
+ //功能名称与页面名称为key 举例 页面名称_功能名称>>WebSocketServer
|
|
|
private static Map<String,WebSocketServer> webSocketMap = new HashMap();
|
|
|
+ //用来存储每个调度任务请求的数量
|
|
|
+ private static Map<String,Integer> funcationNunberMap = new HashMap();
|
|
|
+
|
|
|
+ public static Map<String, Set<String>> keyidMap = new HashMap();
|
|
|
+
|
|
|
|
|
|
//与某个客户端的连接会话,需要通过它来给客户端发送数据
|
|
|
private Session session;
|
|
@@ -45,36 +52,147 @@ public class WebSocketServer extends SocketTaskUtil {
|
|
|
private String pageNumber="";
|
|
|
//接收functionNumber
|
|
|
private String functionNumber="";
|
|
|
+
|
|
|
+ //接收风场编号、接收项目编号、收线路编号、接收风机编号编号
|
|
|
+ private String keyid="";
|
|
|
+ //页数和功能数组合key
|
|
|
+ private String jobkey="";
|
|
|
+ //页数和功能数组合+对象编号(风场、项目、线路、风机)
|
|
|
+ private String pfkey="";
|
|
|
/**
|
|
|
* 连接建立成功调用的方法*/
|
|
|
@OnOpen
|
|
|
- public void onOpen(Session session,@PathParam("pageNumber") String pageNumber,@PathParam("functionNumber") String functionNumber) {
|
|
|
- this.session = session;
|
|
|
- webSocketMap.put((pageNumber+"_"+functionNumber),this);
|
|
|
- webSocketSet.add(this); //加入set中
|
|
|
- addOnlineCount(); //在线数加1
|
|
|
- log.info("有新窗口开始监听:页面编码:"+pageNumber+",功能编码:"+functionNumber+",当前在线人数为" + getOnlineCount());
|
|
|
+ public void onOpen(Session session, @PathParam("pageNumber") String pageNumber, @PathParam("functionNumber") String functionNumber
|
|
|
+ , @PathParam("keyid") String keyid) {
|
|
|
+
|
|
|
//目标连接成功后注入基本信息
|
|
|
- this.sid=sid;
|
|
|
+ this.session = session;
|
|
|
+ this.sid=session.getId();
|
|
|
this.pageNumber=pageNumber; //页面编码
|
|
|
this.functionNumber=functionNumber;//功能编码
|
|
|
- log.info("连接成功");
|
|
|
- if(getOnlineCount()>=1){
|
|
|
- this.restartJob((pageNumber+"_"+functionNumber));//重启任务
|
|
|
+ this.keyid=keyid;
|
|
|
+
|
|
|
+ StringBuilder sb=new StringBuilder();
|
|
|
+ sb.append(pageNumber).append("_").append(functionNumber);
|
|
|
+
|
|
|
+ jobkey=String.valueOf(sb);
|
|
|
+ //判断是否有对象编号
|
|
|
+ if(StringUtils.isNotNull(keyid))
|
|
|
+ {
|
|
|
+ sb.append("_").append(keyid);
|
|
|
+ }
|
|
|
+
|
|
|
+ pfkey=String.valueOf(sb);
|
|
|
+
|
|
|
+ //根据调度任务key进行map分组,二级存储不同seesion的WebSocketServer对象
|
|
|
+
|
|
|
+ webSocketMap.put(pfkey,this);
|
|
|
+
|
|
|
+
|
|
|
+ //根据调度任务统计数量
|
|
|
+ if(funcationNunberMap.containsKey(jobkey))
|
|
|
+ {
|
|
|
+ Integer number=funcationNunberMap.get(jobkey);
|
|
|
+ number++;
|
|
|
+ funcationNunberMap.put(jobkey,number);
|
|
|
+ }else
|
|
|
+ {
|
|
|
+ funcationNunberMap.put(jobkey,1);
|
|
|
+ }
|
|
|
+
|
|
|
+ //页数和功能数组合+对象编号(风场、项目、线路、风机)作为KEY,保存对应的set集合
|
|
|
+ if(webSocketMapSet.containsKey(pfkey))
|
|
|
+ {
|
|
|
+ CopyOnWriteArraySet<WebSocketServer> set=webSocketMapSet.get(pfkey);
|
|
|
+ set.add(this);
|
|
|
+
|
|
|
+ }else
|
|
|
+ {
|
|
|
+ CopyOnWriteArraySet<WebSocketServer> set=new CopyOnWriteArraySet<>();
|
|
|
+ set.add(this);
|
|
|
+ webSocketMapSet.put(pfkey,set);
|
|
|
}
|
|
|
+
|
|
|
+ addOnlineCount(); //在线数加1
|
|
|
+ log.info("有新窗口开始监听:页面编码:"+pageNumber+",功能编码:"+functionNumber+",当前在线人数为" + getOnlineCount());
|
|
|
+
|
|
|
+
|
|
|
+ if(keyidMap.containsKey(jobkey))
|
|
|
+ {
|
|
|
+ Set<String> set=keyidMap.get(jobkey);
|
|
|
+ set.add(keyid);
|
|
|
+ }else
|
|
|
+ {
|
|
|
+ Set<String> set= new HashSet<String>();
|
|
|
+ set.add(keyid);
|
|
|
+ keyidMap.put(jobkey,set);
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("连接成功");
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+// if(funcationNunberMap.containsKey(jobkey))
|
|
|
+// {
|
|
|
+// Integer number=funcationNunberMap.get(jobkey);
|
|
|
+// if(number==1){
|
|
|
+// this.restartJob((jobkey));//重启任务
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+ this.restartJob((jobkey));//重启任务
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 连接关闭调用的方法
|
|
|
*/
|
|
|
@OnClose
|
|
|
- public void onClose() {
|
|
|
- webSocketMap.remove(this.pageNumber+"_"+this.functionNumber);
|
|
|
- webSocketSet.remove(this); //从set中删除
|
|
|
+ public void onClose(@PathParam("pageNumber") String pageNumber,@PathParam("functionNumber") String functionNumber,@PathParam("keyid") String keyid) {
|
|
|
+ StringBuilder sb=new StringBuilder();
|
|
|
+ sb.append(pageNumber).append("_").append(functionNumber);
|
|
|
+
|
|
|
+ jobkey=String.valueOf(sb);
|
|
|
+ //判断是否有对象编号
|
|
|
+ if(StringUtils.isNotNull(keyid))
|
|
|
+ {
|
|
|
+ sb.append("_").append(keyid);
|
|
|
+ }
|
|
|
+
|
|
|
+ pfkey=String.valueOf(sb);
|
|
|
+
|
|
|
+
|
|
|
+ if(webSocketMap.containsKey(pfkey))
|
|
|
+ {
|
|
|
+ webSocketMap.remove(pfkey);
|
|
|
+
|
|
|
+ }
|
|
|
+ if(webSocketMapSet.containsKey(pfkey))
|
|
|
+ {
|
|
|
+ CopyOnWriteArraySet<WebSocketServer> set=webSocketMapSet.get(pfkey);
|
|
|
+ set.remove(this);
|
|
|
+ }
|
|
|
subOnlineCount(); //在线数减1
|
|
|
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
|
|
|
+ if(funcationNunberMap.containsKey(jobkey))
|
|
|
+ {
|
|
|
+ int fnumber=funcationNunberMap.get(jobkey);
|
|
|
+ if(fnumber>0)
|
|
|
+ {
|
|
|
+ fnumber--;
|
|
|
+ }else {
|
|
|
+ fnumber=0;
|
|
|
+ }
|
|
|
+ funcationNunberMap.put(jobkey,fnumber);
|
|
|
+ if(fnumber==0){
|
|
|
+ this.deleteJob((jobkey));//关闭任务
|
|
|
+ keyidMap=new HashMap<>();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
if(getOnlineCount()==0){
|
|
|
- this.deleteJob((pageNumber+"_"+functionNumber));//关闭任务
|
|
|
+ this.deleteJob((jobkey));//关闭任务
|
|
|
+ keyidMap=new HashMap<>();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -86,13 +204,18 @@ public class WebSocketServer extends SocketTaskUtil {
|
|
|
public void onMessage(String message, Session session) {
|
|
|
log.info("收到来自窗口:"+pageNumber+",功能编码:"+functionNumber+"的信息:"+message);
|
|
|
//群发消息
|
|
|
- for (WebSocketServer item : webSocketSet) {
|
|
|
- try {
|
|
|
- item.sendMessage(message);
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
+ if(webSocketMapSet.containsKey(pfkey))
|
|
|
+ {
|
|
|
+ CopyOnWriteArraySet<WebSocketServer> set=webSocketMapSet.get(pfkey);
|
|
|
+ for (WebSocketServer item : set) {
|
|
|
+ try {
|
|
|
+ item.sendMessage(message);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -101,9 +224,51 @@ public class WebSocketServer extends SocketTaskUtil {
|
|
|
* @param error
|
|
|
*/
|
|
|
@OnError
|
|
|
- public void onError(Session session, Throwable error) {
|
|
|
+ public void onError(Session session, Throwable error,@PathParam("pageNumber") String pageNumber,@PathParam("functionNumber") String functionNumber,@PathParam("keyid") String keyid) {
|
|
|
log.error("发生错误");
|
|
|
- error.printStackTrace();
|
|
|
+
|
|
|
+ StringBuilder sb=new StringBuilder();
|
|
|
+ sb.append(pageNumber).append("_").append(functionNumber);
|
|
|
+
|
|
|
+ jobkey=String.valueOf(sb);
|
|
|
+ //判断是否有对象编号
|
|
|
+ if(StringUtils.isNotNull(keyid))
|
|
|
+ {
|
|
|
+ sb.append("_").append(keyid);
|
|
|
+ }
|
|
|
+
|
|
|
+ pfkey=String.valueOf(sb);
|
|
|
+
|
|
|
+ if(webSocketMap.containsKey(pfkey))
|
|
|
+ {
|
|
|
+ webSocketMap.remove(pfkey);
|
|
|
+
|
|
|
+ }
|
|
|
+ if(webSocketMapSet.containsKey(pfkey))
|
|
|
+ {
|
|
|
+ CopyOnWriteArraySet<WebSocketServer> set=webSocketMapSet.get(pfkey);
|
|
|
+ set.remove(this);
|
|
|
+ }
|
|
|
+ subOnlineCount(); //在线数减1
|
|
|
+ log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
|
|
|
+ if(funcationNunberMap.containsKey(jobkey))
|
|
|
+ {
|
|
|
+ int fnumber=funcationNunberMap.get(jobkey);
|
|
|
+ if(fnumber>0)
|
|
|
+ {
|
|
|
+ fnumber--;
|
|
|
+ }else {
|
|
|
+ fnumber=0;
|
|
|
+ }
|
|
|
+ funcationNunberMap.put(jobkey,fnumber);
|
|
|
+ if(fnumber==0){
|
|
|
+ this.deleteJob((jobkey));//关闭任务
|
|
|
+ keyidMap=new HashMap<>();
|
|
|
+ }
|
|
|
+ }else if(getOnlineCount()==0){
|
|
|
+ this.deleteJob((jobkey));//关闭任务
|
|
|
+ keyidMap=new HashMap<>();
|
|
|
+ }
|
|
|
}
|
|
|
/**
|
|
|
* 实现服务器主动推送
|
|
@@ -130,41 +295,60 @@ public class WebSocketServer extends SocketTaskUtil {
|
|
|
/**
|
|
|
* 群发自定义消息
|
|
|
* */
|
|
|
- public static void sendInfo(Object message,@PathParam("pageNumber") String pageNumber,@PathParam("functionNumber") String functionNumber) throws IOException, EncodeException {
|
|
|
- log.info("推送消息到窗口:"+pageNumber+",功能编码:"+functionNumber+"的信息:"+message);
|
|
|
- WebSocketServer item = webSocketMap.get(pageNumber + "_" + functionNumber);
|
|
|
-
|
|
|
- //这里可以设定只推送给这个sid的,为null则全部推送
|
|
|
- if(functionNumber==null||pageNumber==null) {
|
|
|
- throw new BindException("请核对编码信息(包括页面编码以及功能编码非空)");
|
|
|
- }else if(item.pageNumber.equals(pageNumber)&&item.functionNumber.equals(functionNumber)){
|
|
|
- item.sendMessage(message);
|
|
|
+// public static void sendInfo(Object message,@PathParam("pageNumber") String pageNumber,@PathParam("functionNumber") String functionNumber) throws IOException, EncodeException {
|
|
|
+// log.info("推送消息到窗口:"+pageNumber+",功能编码:"+functionNumber+"的信息:"+message);
|
|
|
+// WebSocketServer item = webSocketMap.get(pageNumber + "_" + functionNumber);
|
|
|
+//
|
|
|
+// //这里可以设定只推送给这个sid的,为null则全部推送
|
|
|
+// if(functionNumber==null||pageNumber==null) {
|
|
|
+// throw new BindException("请核对编码信息(包括页面编码以及功能编码非空)");
|
|
|
+// }else if(item.pageNumber.equals(pageNumber)&&item.functionNumber.equals(functionNumber)){
|
|
|
+// item.sendMessage(message);
|
|
|
+// }
|
|
|
+// //利用set容器循环的方式暂时被关闭,原因节约资源
|
|
|
+///* for (WebSocketServer item : webSocketSet) {
|
|
|
+// try {
|
|
|
+// //这里可以设定只推送给这个sid的,为null则全部推送
|
|
|
+// if(functionNumber==null||pageNumber==null) {
|
|
|
+// throw new BindException("请核对编码信息");
|
|
|
+// }else if(item.pageNumber.equals(pageNumber)&&item.functionNumber.equals(functionNumber)){
|
|
|
+// item.sendMessage(message);
|
|
|
+// }
|
|
|
+// } catch (IOException e) {
|
|
|
+// e.printStackTrace();
|
|
|
+// continue;
|
|
|
+// }
|
|
|
+// }*/
|
|
|
+// }
|
|
|
+ public void sendObject(Object message,@PathParam("pageNumber") String pageNumber,@PathParam("functionNumber") String functionNumber,@PathParam("keyid") String keyid) throws IOException, EncodeException {
|
|
|
+ log.info("推送消息到窗口:"+pageNumber+",功能编码:"+functionNumber+"的信息:"+message);
|
|
|
+
|
|
|
+
|
|
|
+ StringBuilder sb=new StringBuilder();
|
|
|
+ sb.append(pageNumber).append("_").append(functionNumber);
|
|
|
+
|
|
|
+ //判断是否有对象编号
|
|
|
+ if(StringUtils.isNotNull(keyid))
|
|
|
+ {
|
|
|
+ sb.append("_").append(keyid);
|
|
|
}
|
|
|
- //利用set容器循环的方式暂时被关闭,原因节约资源
|
|
|
-/* for (WebSocketServer item : webSocketSet) {
|
|
|
- try {
|
|
|
- //这里可以设定只推送给这个sid的,为null则全部推送
|
|
|
- if(functionNumber==null||pageNumber==null) {
|
|
|
- throw new BindException("请核对编码信息");
|
|
|
- }else if(item.pageNumber.equals(pageNumber)&&item.functionNumber.equals(functionNumber)){
|
|
|
- item.sendMessage(message);
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- continue;
|
|
|
+
|
|
|
+ pfkey=String.valueOf(sb);
|
|
|
+ if(webSocketMap.containsKey(pfkey))
|
|
|
+ {
|
|
|
+ WebSocketServer item =webSocketMap.get(pfkey);
|
|
|
+
|
|
|
+ //这里可以设定只推送给这个sid的,为null则全部推送
|
|
|
+ if(functionNumber==null||pageNumber==null) {
|
|
|
+ throw new BindException("请核对编码信息(包括页面编码以及功能编码非空)");
|
|
|
+ }else if(item.pageNumber.equals(pageNumber)&&item.functionNumber.equals(functionNumber)){
|
|
|
+ item.sendMessage(message);
|
|
|
}
|
|
|
- }*/
|
|
|
- }
|
|
|
- public void sendObject(Object message,@PathParam("pageNumber") String pageNumber,@PathParam("functionNumber") String functionNumber) throws IOException, EncodeException {
|
|
|
- log.info("推送消息到窗口:"+pageNumber+",功能编码:"+functionNumber+"的信息:"+message);
|
|
|
- WebSocketServer item = webSocketMap.get(pageNumber + "_" + functionNumber);
|
|
|
|
|
|
- //这里可以设定只推送给这个sid的,为null则全部推送
|
|
|
- if(functionNumber==null||pageNumber==null) {
|
|
|
- throw new BindException("请核对编码信息(包括页面编码以及功能编码非空)");
|
|
|
- }else if(item.pageNumber.equals(pageNumber)&&item.functionNumber.equals(functionNumber)){
|
|
|
- item.sendMessage(message);
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
//利用set容器循环的方式暂时被关闭,原因节约资源
|
|
|
/* for (WebSocketServer item : webSocketSet) {
|
|
|
try {
|
|
@@ -192,5 +376,8 @@ public class WebSocketServer extends SocketTaskUtil {
|
|
|
public static synchronized void subOnlineCount() {
|
|
|
WebSocketServer.onlineCount--;
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
}
|
|
|
|