Browse Source

解决了同一个地址多个session点表变量推送共享的问题

songwenbin 2 năm trước cách đây
mục cha
commit
1f0caedcff

+ 15 - 7
gddlly/src/main/java/com/gyee/edge/gddlly/iec104/Iec104Session.java

@@ -9,7 +9,7 @@ import com.gyee.edge.gddlly.iec104.protocol.TypeIdentifierEnum;
 import com.gyee.edge.gddlly.iec104.protocol.UControlEnum;
 import com.gyee.edge.gddlly.redis.RedisDataService;
 import com.gyee.edge.gddlly.utils.SpringContextUtil;
-import io.netty.channel.*;
+import io.netty.channel.ChannelHandlerContext;
 import lombok.extern.slf4j.Slf4j;
 
 import java.net.InetSocketAddress;
@@ -38,6 +38,8 @@ public class Iec104Session {
 
     private ArrayList<Point> aiList;
     private ArrayList<Point> diList;
+    private ArrayList<String> aiKeys;
+    private ArrayList<String> diKeys;
 
     public Iec104Session(ChannelHandlerContext ctx) {
         channelHandlerContext = ctx;
@@ -86,7 +88,7 @@ public class Iec104Session {
                         response = BasicInstruction104.createSysMessage(UControlEnum.STOPDT_YES);
                         break;
                     default:
-                        break;
+                        return;
                 }
                 if (response != null)
                     sendMessage2(response);
@@ -99,9 +101,15 @@ public class Iec104Session {
                 //acceptSeq = (short)(request.getSendSeq() + 1);
                 if (request.getTypeIdentifier() == TypeIdentifierEnum.generalCall) {
                     // 总召唤命令
-                    publicAddress = request.getTerminalAddress();
-                    aiList = pointService.getAiList().get((int)publicAddress);
-                    diList = pointService.getDiList().get((int)publicAddress);
+                    short newAddress = request.getTerminalAddress();
+                    if (publicAddress != newAddress) {
+                        publicAddress = request.getTerminalAddress();
+                        aiList = (ArrayList<Point>)pointService.getAiList().get((int)publicAddress).clone();
+                        diList = (ArrayList<Point>)pointService.getDiList().get((int)publicAddress).clone();
+                        aiKeys = pointService.getAiKeys((int)publicAddress);
+                        diKeys = pointService.getDiKeys((int)publicAddress);
+                    }
+
                     if (TransferReason.ACTIVATE == TransferReason.valueOf(request.getTransferReason())) {
                         state = SessionState.CALL_ALL;
                         sendMessage(BasicInstruction104.getYesGeneralCallRuleDetail104(acceptSeq, sendSeq));
@@ -163,7 +171,7 @@ public class Iec104Session {
                     while(isRunning) {
                         //todo: 从redis读取数据,发送到客户端
                         //1、从redis读取数据,填充aiMap、diMap中,point的value,consumed,lastUpdateTime;
-                        if (redisService.readRedisData(publicAddress,"AI")) {
+                        if (redisService.readRedisData(aiKeys, aiList)) {
                             if (state == SessionState.CALL_ALL) {
                                 //2、如果SessionState.CALL_ALL, 全量传输遥测点
                                 sendPoints(aiList, true, "AI");
@@ -175,7 +183,7 @@ public class Iec104Session {
                             }
                         }
 
-                        if (redisService.readRedisData(publicAddress,"DI")) {
+                        if (redisService.readRedisData(diKeys,diList)) {
                             if (state == SessionState.CALL_ALL) {
                                 //2、如果SessionState.CALL_ALL, 全量传输遥测点
                                 sendPoints(diList, true, "DI");

+ 9 - 2
gddlly/src/main/java/com/gyee/edge/gddlly/iec104/handler/Check104Handler.java

@@ -10,7 +10,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.ReferenceCountUtil;
 import lombok.extern.slf4j.Slf4j;
 
-
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
 /**
@@ -37,7 +37,7 @@ public class Check104Handler extends ChannelInboundHandlerAdapter {
 
 		byte[] bytes = new byte[result.readableBytes()];
 		result.readBytes(bytes);
-		log.info("接收到的报文: " + ByteUtil.byteArrayToHexString(bytes));
+		log.info(getSessionId(ctx) + " 发送的报文: " + ByteUtil.byteArrayToHexString(bytes));
 		if (bytes.length < Iec104Constant.APCI_LENGTH || bytes[0] != Iec104Constant.HEAD_DATA) {
 			log.error("报文无效");
 			ReferenceCountUtil.release(result);
@@ -46,4 +46,11 @@ public class Check104Handler extends ChannelInboundHandlerAdapter {
 			ctx.fireChannelRead(msg);
 		}
 	}
+
+	private String getSessionId(ChannelHandlerContext ctx) {
+		InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
+		String clientIp = ipSocket.getAddress().getHostAddress();
+		int clientPort = ipSocket.getPort();
+		return clientIp + ":" + clientPort;
+	}
 }

+ 6 - 1
gddlly/src/main/java/com/gyee/edge/gddlly/iec104/handler/Iec104ServerHandler.java

@@ -27,11 +27,15 @@ public class Iec104ServerHandler extends SimpleChannelInboundHandler<Iec104Messa
 		}
 
 		sessionMap.put(session.getId(), session);
+		log.info("建立新连接,sessionid = " + session.getId());
+		log.info("当前连接数:" + sessionMap.keySet().size());
 	}
 
 	@Override
 	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
 		closeSession(ctx);
+		log.info("连接断开,sessionid =" + getSessionId(ctx));
+		log.info("当前连接数:" + sessionMap.size());
 	}
 	
 	@Override
@@ -51,7 +55,8 @@ public class Iec104ServerHandler extends SimpleChannelInboundHandler<Iec104Messa
 		cause.printStackTrace();
 
 		closeSession(ctx);
-
+		log.info("连接异常断开,sessionid =" + getSessionId(ctx));
+		log.info("当前连接数:" + sessionMap.size());
 
 		//todo: 异常处理
 		//ctx.close();

+ 54 - 0
gddlly/src/main/java/com/gyee/edge/gddlly/redis/RedisDataService.java

@@ -125,4 +125,58 @@ public class RedisDataService {
         return true;
     }
 
+    public boolean readRedisData(  List<String> keys, List<Point>  points){
+        try {
+
+            if (keys == null || points == null || keys.isEmpty() ||
+                    keys.size() != points.size() ) {
+                log.error("readRedisData 参数错误 ");
+                return false;
+            }
+
+            List<String> vals = (List<String>)this.redisUtil.getmAll(keys);
+            if (vals == null || vals.isEmpty() || vals.size() != keys.size()) {
+                log.error("读取redis错误!");
+                return false;
+            }
+
+            for (int i = 0; i < keys.size(); i++){
+                Point point = points.get(i);
+                String val = vals.get(i);
+                if (point == null || !StringUtils.hasText(val))
+                    continue;
+
+                String[] vls = val.split(":");
+                if (vls.length != 2)
+                    continue;
+
+                String ts = vls[0].trim();
+                String vs = vls[1].trim();
+
+                long time = Long.valueOf(ts.substring(ts.indexOf("\"") + 1, ts.length() - 1));
+                double value = Double.valueOf(vs.substring(1, vs.lastIndexOf("\"")));
+
+                if (point.getValue() != value ) {
+                    point.setConsumed(false);
+                    point.setValue(value);
+                    point.setLastUpdateTime(new Date(time));
+                }
+
+//                if (point.getLastUpdateTime() == null)
+//                    point.setLastUpdateTime(new Date(time));
+
+
+
+            }
+
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            e.printStackTrace();
+            return false;
+        }
+
+        return  true;
+    }
+
+
 }