Browse Source

长文件传输完善

songwenbin 1 year ago
parent
commit
a1d1a08772

+ 1 - 1
gdnxak102/src/main/java/com/gyee/edge/gdnxak/ApplicationBootstrap.java

@@ -79,7 +79,7 @@ public class ApplicationBootstrap implements CommandLineRunner {
             iec102Client.sendHeartBeat();
 
             //发送召唤用户数据指令
-            iec102Client.callUserData();
+            iec102Client.callUserData(1);
         }
 
     }

+ 2 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/forcast/ForcastService.java

@@ -42,6 +42,7 @@ public class ForcastService {
     }
 
     public ForcastModel getForcastModel() {
+        //todo: 未完结的报文不能发送
         return forcastQueue.poll();
     }
 
@@ -58,6 +59,7 @@ public class ForcastService {
         for (int i = 0; i < lineArray.length; i++) {
             //每行字符串line, 多个空格替换为一个空格
             String line = lineArray[i].replaceAll(" +"," ");
+           // System.out.println("****** " + line);
             //解析type和time,例子:<! Entity=ZWS05F type=CDQ time='2023-02-27_17:15' !>
             if (line.startsWith("<!")) {
                 //每行字符串按空格分割为fieldArray数组

File diff suppressed because it is too large
+ 38 - 2
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/BasicInstruction102.java


+ 1 - 1
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/FrameDecoder.java

@@ -49,7 +49,7 @@ public class FrameDecoder extends ByteToMessageDecoder {
                 ByteBuf data = buffer.readBytes(newDataLength);
                 out.add(data);
             } else if (header == 0x68) {
-                log.debug("0x680x680x680x680x680x680x680x680x68");
+
                 if (buffer.readableBytes() < 10) {
                     buffer.readerIndex(beginReader);
                     break;

+ 88 - 10
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Client.java

@@ -2,6 +2,7 @@ package com.gyee.edge.gdnxak.iec102;
 
 import com.gyee.edge.gdnxak.forcast.ForcastModel;
 import com.gyee.edge.gdnxak.forcast.ForcastService;
+import com.gyee.edge.gdnxak.utils.ByteUtil;
 import com.gyee.edge.gdnxak.utils.SpringContextUtil;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
@@ -12,7 +13,9 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.UnsupportedEncodingException;
 import java.util.Date;
+import java.util.HashMap;
 
 @Slf4j
 public class Iec102Client {
@@ -29,6 +32,10 @@ public class Iec102Client {
     /** 将<code>Channel</code>保存起来, 可用于在其他非handler的地方发送数据 */
     private Channel channel;
 
+    //长文件缓存,待文件全部传输完毕后,再解析转发;例如:LLCDQ, DQ等
+    private Iec102Message longFileMessage;
+    private HashMap<String, Iec102Message> msgMap;
+
     public Iec102Client(String host, int port) {
         this.host = host;
         this.port = port;
@@ -103,11 +110,18 @@ public class Iec102Client {
         sendMessage(msg);
     }
 
-    public void callUserData() {
+    //todo: 多文件传输时,注意FCB位置的重置,即5a、7a交替发送
+    public void callUserData(int frameCnt) {
+        Iec102Message iec102Message = null;
+        log.info("framecnt = " + frameCnt);
         //召唤一级数据
-        Iec102Message msg23 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_USER1);
+        if (0 == frameCnt%2){
+            iec102Message = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_USER11);
+        } else {
+            iec102Message = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_USER1);
+        }
 //        Iec102Message msg23 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.M_USERDATA2);
-        sendMessage(msg23);
+        sendMessage(iec102Message);
     }
 
     public void close() {
@@ -130,16 +144,34 @@ public class Iec102Client {
                         break;
                 }
             } else if (Iec102FrameType.Mutable == response.getFrameType()) {
+
+                Iec102Message iMsg = checkFileMessage(response);
                 //处理用户数据
-                log.info("收到用户消息,文件名:" + response.getDataFile());
-                log.info("文件内容:" + response.getDataContent());
-                //todo: 解析光功率预测文件
-                ForcastModel forcastModel = forcastService.readForcastFileString(response.getDataContent());
-                forcastService.putForcastModel(forcastModel);
-                if (forcastModel != null) {
-                    log.info(forcastModel.dataType + ", size = " + forcastModel.getForcastData().size());
+                log.info("收到用户消息,文件名:" + iMsg.getDataFile());
+
+                if (0x08 == response.getCOT()) {
+                    //再次发送召唤消息
+                    callUserData(iMsg.getFrameCnt());
+                    return;
                 }
 
+                if (0x07 == response.getCOT()) {
+                    log.info("文件内容:" + iMsg.getDataContentString());
+
+                    //解析光功率预测文件
+                    ForcastModel forcastModel = forcastService.readForcastFileString(iMsg.getDataContentString());
+                    if (forcastModel != null) {
+//                        log.info(forcastModel.dataType + ", size = " + forcastModel.getForcastData().size());
+                        forcastService.putForcastModel(forcastModel);
+                    }
+
+                    if (iMsg.getFrameCnt() > 1) {
+                        //todo: 发送文件传输结束确认消息
+                        Iec102Message fileEndMsg = BasicInstruction102.createFileEndMessage(iMsg.getFrameCnt(), iMsg.getDataContent().length);
+                        sendMessage(fileEndMsg);
+                    }
+
+                }
             }
         } catch (Exception ex) {
             log.error(ex.getMessage());
@@ -148,6 +180,52 @@ public class Iec102Client {
     }
 
 
+    private Iec102Message checkFileMessage(Iec102Message newMsg) throws UnsupportedEncodingException {
+        String mapKey = newMsg.getDataFile();
+        if (msgMap == null) {
+            msgMap = new HashMap<>();
+        }
+        if (msgMap.containsKey(mapKey)) {
+            Iec102Message oldMsg = msgMap.get(mapKey);
+            oldMsg = mergeFileMessage(oldMsg, newMsg);
+            return oldMsg;
+        }else {
+            if (newMsg.getCOT() == 0x08) {
+                newMsg.setFrameCnt(2);  //召唤消息在文件消息之前
+                //不完整的新文件
+                msgMap.put(newMsg.getDataFile(), newMsg);
+            }
+
+            return newMsg;
+        }
+    }
+
+    private Iec102Message mergeFileMessage(Iec102Message oldMsg, Iec102Message newMsg) throws UnsupportedEncodingException {
+        String newMsgData = newMsg.getDataContentString();
+        byte[] newBytes;
+        if (newMsgData.startsWith("#")) {
+            newBytes = ByteUtil.byteMergerWith0D0A(oldMsg.getDataContent(), newMsg.getDataContent());
+        } else {
+            newBytes = ByteUtil.byteMerger(oldMsg.getDataContent(), newMsg.getDataContent());
+        }
+
+        oldMsg.setDataContent(newBytes);
+        oldMsg.setCOT(newMsg.getCOT());
+        oldMsg.setFrameCnt(oldMsg.getFrameCnt() + 1);
+
+        return oldMsg;
+    }
+
+//
+//    private void mergeFileMessage2(Iec102Message iec102Message) {
+//        if (longFileMessage == null)
+//            longFileMessage = iec102Message;
+//        else {
+//            byte[] newBytes = ByteUtil.byteMergerWith0D0A(longFileMessage.getDataContent(), iec102Message.getDataContent());
+//            longFileMessage.setDataContent(newBytes);
+//        }
+//    }
+
     private void sleep(int millionSencond) {
         try {
             Thread.sleep(millionSencond);

+ 10 - 3
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Decoder.java

@@ -35,6 +35,13 @@ public class Iec102Decoder extends ByteToMessageDecoder {
 			msg102.setAddress(in.getShort(2));
 			msg102.setCheckSum(data[4]);
 			msg102.setFixedFrame(data);
+		} else if (data.length == 19 && data[0] == (byte)0x68) {
+			msg102.setFrameType(Iec102FrameType.Fixed);
+			msg102.setHeader(data[0]);
+			msg102.setControl1(data[4]);
+			msg102.setAddress(in.getShort(5));
+			msg102.setCheckSum(data[17]);
+			msg102.setFixedFrame(data);
 		} else if (data.length > 6 && data[0] == (byte)0x68) {
 			msg102.setFrameType(Iec102FrameType.Mutable);
 			msg102.setHeader(data[0]);
@@ -58,9 +65,9 @@ public class Iec102Decoder extends ByteToMessageDecoder {
 			String fileName = new String(arrFileName,"UTF-8");
 			msg102.setDataFile(fileName);
 
-			byte[] arrFileData = Arrays.copyOfRange(data, 46, data.length-3);
-			String fileData = new String(arrFileData,"GBK");
-			msg102.setDataContent(fileData);
+			byte[] arrFileData = Arrays.copyOfRange(data, 45, data.length-2);
+//			String fileData = new String(arrFileData,"GBK");
+			msg102.setDataContent(arrFileData);
 
 			//log.info(msg102.getDataContent());
 		}

+ 11 - 3
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Message.java

@@ -3,6 +3,7 @@ package com.gyee.edge.gdnxak.iec102;
 
 import lombok.Data;
 
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Date;
 
@@ -28,7 +29,7 @@ public class Iec102Message {
 	//帧长
 	private short length;
 
-	//控制域 2字节
+	//控制域 1字节
 	private byte control1;
 
 	//地址域: 2字节
@@ -41,7 +42,7 @@ public class Iec102Message {
 	private byte TYP;
 	//可变结构限定词
 	private byte VSQ;
-	//传送原因
+	//传送原因 0x07 文件传输结束; 0x08文件传输未结束
 	private byte COT;
 	//设备地址
 	private short deviceAddress;
@@ -50,7 +51,7 @@ public class Iec102Message {
 	//数据文件名 32个字节
 	private String dataFile;
 	//数据文件内容,UTF-8格式
-	private String dataContent;
+	private byte[] dataContent;
 
 	//校验和
 	private byte checkSum;
@@ -60,4 +61,11 @@ public class Iec102Message {
 
 	private String hexString;
 
+	public String getDataContentString() throws UnsupportedEncodingException {
+		return new String(dataContent,"GBK");
+	}
+
+	private int frameCnt = 1;
+
+
 }

+ 24 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/utils/ByteUtil.java

@@ -419,4 +419,28 @@ public class ByteUtil {
         return ret;
     }
 
+
+    /**
+     * byte数组合并
+     * System.arraycopy()方法
+     * @param bt1
+     * @param bt2
+     * @return
+     */
+    public static byte[] byteMerger(byte[] bt1, byte[] bt2){
+        byte[] bt3 = new byte[bt1.length+bt2.length];
+        System.arraycopy(bt1, 0, bt3, 0, bt1.length);
+        System.arraycopy(bt2, 0, bt3, bt1.length, bt2.length);
+        return bt3;
+    }
+
+    public static byte[] byteMergerWith0D0A(byte[] bt1, byte[] bt2){
+        byte[] bt3 = new byte[bt1.length+bt2.length+2];
+        byte[] lineByte = new byte[] {0x0D, 0x0A};
+        System.arraycopy(bt1, 0, bt3, 0, bt1.length);
+        System.arraycopy(lineByte, 0,bt3, bt1.length, 2);
+        System.arraycopy(bt2, 0, bt3, bt1.length+2, bt2.length);
+        return bt3;
+    }
+
 }