Просмотр исходного кода

1.新增gyfp2协议模块,包括序列化和反序列化,2.通过openfeign调用适配器接口数据写入golden,3.定义sqlite配置文件格式

‘xugp 2 лет назад
Родитель
Сommit
e021059965
45 измененных файлов с 3107 добавлено и 3550 удалено
  1. 14 9
      common/utils/src/main/java/com/gyee/edge/common/utils/ByteUtil.java
  2. 1630 0
      common/utils/src/main/java/com/gyee/edge/common/utils/BytesUtils.java
  3. 1 0
      gateway/build.gradle
  4. 4 1
      gateway/src/main/java/com/gyee/edge/gateway/ApplicationEventListener.java
  5. 18 7
      gateway/src/main/java/com/gyee/edge/gateway/bridge/rocketmq/producter/AsyncProducter.java
  6. 57 0
      gateway/src/main/java/com/gyee/edge/gateway/bridge/rocketmq/producter/Gyfp2Protocol.java
  7. 0 84
      gateway/src/main/java/com/gyee/edge/gateway/bridge/test/ReadGolden.java
  8. 0 10
      gateway/src/main/java/com/gyee/edge/gateway/bridge/test/StringKey.java
  9. 8 0
      gateway/src/main/java/com/gyee/edge/gateway/config/NettyHttpServer.java
  10. 33 0
      gateway/src/main/java/com/gyee/edge/gateway/config/sqlite/EndPoint.java
  11. 12 0
      gateway/src/main/java/com/gyee/edge/gateway/config/sqlite/ProtocolType.java
  12. 0 54
      gateway/src/main/java/com/gyee/edge/gateway/message/GYMessage.java
  13. 0 61
      gateway/src/main/java/com/gyee/edge/gateway/message/MessageData.java
  14. 0 52
      gateway/src/main/java/com/gyee/edge/gateway/message/MessageMutable.java
  15. 0 50
      gateway/src/main/java/com/gyee/edge/gateway/message/TestHalp.java
  16. 0 1455
      gateway/src/main/java/com/gyee/edge/gateway/protobuf/UserProto.java
  17. 0 81
      gateway/src/main/java/com/gyee/edge/gateway/protobuf/UserProtoTest.java
  18. 0 24
      gateway/src/main/java/com/gyee/edge/gateway/protobuf/user.proto
  19. 1 0
      gateway/src/main/java/com/gyee/edge/gateway/restful/iohandler/HttpServerHandler.java
  20. 2 0
      gateway/src/main/resources/application.yaml
  21. 2 0
      loader/build.gradle
  22. 4 4
      loader/src/main/java/com/gyee/edge/loader/ApplicationConsumerEventListener.java
  23. 237 0
      loader/src/main/java/com/gyee/edge/loader/adapter/DataWrite.java
  24. 0 11
      loader/src/main/java/com/gyee/edge/loader/data/Datas.java
  25. 8 14
      loader/src/main/java/com/gyee/edge/loader/data/GeneralTsData.java
  26. 0 15
      loader/src/main/java/com/gyee/edge/loader/data/TsData.java
  27. 0 15
      loader/src/main/java/com/gyee/edge/loader/data/TsDataType.java
  28. 8 13
      loader/src/main/java/com/gyee/edge/loader/data/TsPointData.java
  29. 0 13
      loader/src/main/java/com/gyee/edge/loader/data/User.java
  30. 0 53
      loader/src/main/java/com/gyee/edge/loader/golden/DataWrite.java
  31. 21 0
      loader/src/main/java/com/gyee/edge/loader/mapper/DBPointMapper.java
  32. 0 1455
      loader/src/main/java/com/gyee/edge/loader/protobuf/UserProto.java
  33. 0 24
      loader/src/main/java/com/gyee/edge/loader/protobuf/user.proto
  34. 44 42
      loader/src/main/java/com/gyee/edge/loader/rocketmq/comuser/BalanceComuser.java
  35. 2 3
      loader/src/main/java/com/gyee/edge/loader/service/WriteDataService.java
  36. 74 0
      loader/src/main/java/com/gyee/edge/loader/sqlite/ReadDBPointData.java
  37. 364 0
      loader/src/main/java/com/gyee/edge/loader/sqlite/config/DB.java
  38. 62 0
      loader/src/main/java/com/gyee/edge/loader/sqlite/config/Database.java
  39. 9 0
      loader/src/main/resources/application.yaml
  40. BIN
      loader/src/main/resources/dbpointmapper.sqlite3
  41. 17 0
      protocol/gyfp2/build.gradle
  42. 73 0
      protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/GYMessage.java
  43. 307 0
      protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/MessageData.java
  44. 94 0
      protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/MessageMutable.java
  45. 1 0
      settings.gradle

+ 14 - 9
common/utils/src/main/java/com/gyee/edge/common/utils/ByteUtil.java

@@ -1,6 +1,10 @@
 package com.gyee.edge.common.utils;
 
+import java.util.Optional;
+
 public class ByteUtil {
+
+
     /**
      * 转换short为byte
      *
@@ -165,15 +169,16 @@ public class ByteUtil {
      * @return
      */
     public static float getFloat(byte[] b, int index) {
-        int l;
-        l = b[index + 0];
-        l &= 0xff;
-        l |= ((long) b[index + 1] << 8);
-        l &= 0xffff;
-        l |= ((long) b[index + 2] << 16);
-        l &= 0xffffff;
-        l |= ((long) b[index + 3] << 24);
-        return Float.intBitsToFloat(l);
+        int accum = 0;
+        accum= accum|(b[index + 0] & 0xff) << 0;
+
+        accum= accum|(b[index + 1] & 0xff) << 8;
+
+        accum= accum|(b[index + 2] & 0xff) << 16;
+
+        accum= accum|(b[index + 3] & 0xff) << 24;
+
+        return Float.intBitsToFloat(accum);
     }
 
     /**

Разница между файлами не показана из-за своего большого размера
+ 1630 - 0
common/utils/src/main/java/com/gyee/edge/common/utils/BytesUtils.java


+ 1 - 0
gateway/build.gradle

@@ -16,6 +16,7 @@ apply plugin: 'io.spring.dependency-management'
 
 dependencies {
     implementation project(":common:utils")
+    implementation project(":protocol:gyfp2")
     implementation("io.netty:netty-all:$nettyVersion")
     implementation("$bootGroup:spring-boot-starter:$springBootVersion")
     implementation("com.google.code.gson:gson:$gsonVersion")

+ 4 - 1
gateway/src/main/java/com/gyee/edge/gateway/ApplicationEventListener.java

@@ -16,12 +16,15 @@ import org.springframework.stereotype.Component;
 public class ApplicationEventListener implements
         ApplicationListener<ApplicationReadyEvent> {
 
+    @Autowired
+    private AsyncProducter asyncProducter;
     @SneakyThrows
     @Override
     public void onApplicationEvent(ApplicationReadyEvent event) {
         log.info("ApplicationEvent  rised!");
         log.info("listener: " + event.toString());
-        AsyncProducter.dataSendProducter();
+
+        asyncProducter.dataSendProducter();
     }
 
 }

+ 18 - 7
gateway/src/main/java/com/gyee/edge/gateway/bridge/rocketmq/producter/AsyncProducter.java

@@ -1,31 +1,42 @@
 package com.gyee.edge.gateway.bridge.rocketmq.producter;
 
 
-import com.gyee.edge.gateway.bridge.test.ReadGolden;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
 
 //异步请求
 @Slf4j
+@Component
 public class AsyncProducter {
-    public static void dataSendProducter() throws Exception {
+
+    @Value("${rocketmq.namesrvaddr}")
+    String nameSrvAddr;
+
+    @Autowired
+    private Gyfp2Protocol gyfp2Protocol;
+    public  void dataSendProducter() throws Exception {
         //实例化消息的生产者
         DefaultMQProducer producer = new DefaultMQProducer("group_test");
         //设置nameServer的地址
-        producer.setNamesrvAddr("127.0.0.1:9876");
+        producer.setNamesrvAddr(nameSrvAddr);
         //启动producer实例
         producer.start();
-        byte[] bytes = ReadGolden.protoData();
+        //支持protobuf协议          byte[] bytes = ReadGolden.protoData();
+        //支持GYFP2协议
+        byte[] bytes = gyfp2Protocol.toBytes();
             //创建消息
             Message message = new Message("keyMessage"/*Topic*/,
                     "TagA"/* Tag*/, "OrderID888", bytes);
             producer.send(message, new SendCallback() {
                 @Override
                 public void onSuccess(SendResult sendResult) {
-                    System.out.println(sendResult);
+                    log.info("消息发送成功",sendResult);
                 }
 
                 @Override
@@ -34,7 +45,7 @@ public class AsyncProducter {
                     throwable.printStackTrace();
                 }
             });
-//            Thread.sleep(1000);
-//        producer.shutdown();
+            /*Thread.sleep(1000);
+        producer.shutdown();*/
     }
 }

+ 57 - 0
gateway/src/main/java/com/gyee/edge/gateway/bridge/rocketmq/producter/Gyfp2Protocol.java

@@ -0,0 +1,57 @@
+package com.gyee.edge.gateway.bridge.rocketmq.producter;
+
+import com.gyee.protocol.gyfp2.message.GYMessage;
+import com.gyee.protocol.gyfp2.message.MessageData;
+import com.gyee.protocol.gyfp2.message.MessageMutable;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@Slf4j
+@Component
+public class Gyfp2Protocol {
+    public byte[] toBytes(){
+        GYMessage gyMessage = new GYMessage();
+        gyMessage.setAttribute(0);
+
+        MessageMutable messageMutable = new MessageMutable();
+        messageMutable.setMessageType((byte) 11);
+        messageMutable.setKey(1002);
+        messageMutable.setTs(new Date().getTime());
+        gyMessage.setMessageMutable(messageMutable);
+
+        List<MessageData> list = new ArrayList<>();
+//        for (int i = 0;i < 1000;i++){
+//            MessageData messageData = new MessageData();
+//            messageData.setGYFP2_TYPE_FLOAT32(java.util.Optional.of(28.5f));
+//            messageData.setGYFP2_TYPE_BOOLEAN(java.util.Optional.of(true));
+//            double d = 1 + Math.random() * (80 - 1);
+//            messageData.setGYFP2_TYPE_FLOAT64(java.util.Optional.of(d));
+//            list.add(messageData);
+//        }
+
+        MessageData messageData = new MessageData();
+        messageData.setDataKey(1002);
+        messageData.setDataType((byte) 0);
+        messageData.setTs(new Date().getTime());
+        messageData.setGYFP2_TYPE_BOOLEAN(java.util.Optional.of(false));
+        list.add(messageData);
+
+        MessageData messageData1 = new MessageData();
+        messageData1.setDataKey(1002);
+        messageData1.setDataType((byte) 11);
+        messageData1.setTs(new Date().getTime());
+        double d = 1 + Math.random() * (80 - 1);
+        messageData1.setGYFP2_TYPE_FLOAT64(java.util.Optional.of(d));
+        list.add(messageData1);
+        gyMessage.setMessageDataList(list);
+        Date sTdate = new Date();
+        byte[] bytes = gyMessage.toBytes();
+        Date eTdate = new Date();
+        log.info("序列化时间"+(eTdate.getTime()-sTdate.getTime()));
+        return bytes;
+    }
+}

+ 0 - 84
gateway/src/main/java/com/gyee/edge/gateway/bridge/test/ReadGolden.java

@@ -1,84 +0,0 @@
-package com.gyee.edge.gateway.bridge.test;
-
-
-import com.gyee.edge.common.utils.ByteUtil;
-import com.gyee.edge.gateway.protobuf.UserProto;
-
-import java.io.UnsupportedEncodingException;
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
-
-public class ReadGolden {
-    public static byte[] protoData() throws UnsupportedEncodingException {
-        String[] tagNames = StringKey.key.split(",");
-        List<String> list = Arrays.asList(tagNames);
-        Map<String, Object> map = new HashMap<>();
-        Map<byte[], byte[]> bl = ReadGolden.putByte(list);
-        Date ST=new Date();
-        System.out.println("序列化前时间"+(ST.getTime()));
-        UserProto.User.Builder user = UserProto.User.newBuilder();
-        UserProto.User.Datas.Builder objectMap = UserProto.User.Datas.newBuilder();
-        for (Map.Entry<byte[],byte[]> entry:bl.entrySet()){
-            String keys = ReadGolden.byteToString(entry.getKey());
-            double val = ByteUtil.getDouble(entry.getValue(), 0);
-            long ts = ByteUtil.getLong2(entry.getValue(), 8);
-            user.putMap(keys, objectMap.setTs(ts).setValue(val).build());
-        }
-        UserProto.User build = user.build();
-        byte[] s = build.toByteArray();
-        Date ET=new Date();
-        System.out.println("序列化后时间"+(ET.getTime()));
-        System.out.println("序列化时间:"+(ET.getTime()-ST.getTime()));
-        return s;
-    }
-
-
-    public static Map<byte[], byte[]> putByte(List<String> lists) throws UnsupportedEncodingException {
-        Map<byte[], byte[]> b = new HashMap<>();
-        byte[][] rawKeys = new byte[lists.size()][];
-        double min = 1.0;
-        double max = 100.0;
-        for (int i = 0; i < lists.size(); i++) {
-            rawKeys[i] = lists.get(i).getBytes("ascii");
-            byte[] bb = {};
-            //随机生成double
-            double generatedDouble = ThreadLocalRandom.current().nextDouble(min, max);
-            byte[] bt1 = ByteUtil.putDouble(bb, generatedDouble, 0);
-            long time = new Date().getTime();
-            byte[] bt2 = ByteUtil.putLong(bb, time, 0);
-            byte[] bt3 = new byte[bt1.length + bt2.length];
-            bt3[0] = bt1[0];
-            bt3[1] = bt1[1];
-            bt3[2] = bt1[2];
-            bt3[3] = bt1[3];
-            bt3[4] = bt1[4];
-            bt3[5] = bt1[5];
-            bt3[6] = bt1[6];
-            bt3[7] = bt1[7];
-            bt3[8] = bt2[0];
-            bt3[9] = bt2[1];
-            bt3[10] = bt2[2];
-            bt3[11] = bt2[3];
-            bt3[12] = bt2[4];
-            bt3[13] = bt2[5];
-            bt3[14] = bt2[6];
-            bt3[15] = bt2[7];
-            b.put(rawKeys[i], bt3);
-        }
-        return b;
-    }
-
-    private static String byteToString (byte[] bytes) {
-        if (null == bytes || bytes.length == 0) {
-            return "";
-        }
-        String strContent = "";
-        try {
-            strContent = new String(bytes, "utf-8");
-        } catch (UnsupportedEncodingException e) {
-            e.printStackTrace();
-        }
-        return strContent;
-    }
-
-}

+ 0 - 10
gateway/src/main/java/com/gyee/edge/gateway/bridge/test/StringKey.java

@@ -1,10 +0,0 @@
-package com.gyee.edge.gateway.bridge.test;
-
-public class StringKey {
-     public static String key ="SHSJ.NX_GD_SH_XXX_XXX_0001_PLD," +
-             "SHSJ.NX_GD_SH_XXX_XXX_0002_PLD," +
-             "SHSJ.NX_GD_SH_XXX_XXX_0003_PLD," +
-             "SHSJ.NX_GD_SH_XXX_XXX_0004_PLD," +
-             "SHSJ.NX_GD_SH_XXX_XXX_0005_PLD," ;
-
-}

+ 8 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/NettyHttpServer.java

@@ -42,14 +42,18 @@ public class NettyHttpServer implements ApplicationListener<ApplicationStartedEv
     @Override
     public void onApplicationEvent(@NonNull ApplicationStartedEvent event) {
 
+        //创建服务器端的启动对象,配置参数
         ServerBootstrap bootstrap = new ServerBootstrap();
         EventLoopGroup bossGroup = new NioEventLoopGroup();
         EventLoopGroup workerGroup = new NioEventLoopGroup();
 
+        //设置两个线程组
         bootstrap.group(bossGroup, workerGroup);
+        //使用NioSocketChannel 作为服务器的通道实现
         bootstrap.channel(NioServerSocketChannel.class);
         bootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
         bootstrap.childOption(NioChannelOption.SO_REUSEADDR,true);
+        //设置保持活动连接状态
         bootstrap.childOption(NioChannelOption.SO_KEEPALIVE,false);
         bootstrap.childOption(NioChannelOption.SO_RCVBUF, 2048);
         bootstrap.childOption(NioChannelOption.SO_SNDBUF, 2048);
@@ -61,9 +65,12 @@ public class NettyHttpServer implements ApplicationListener<ApplicationStartedEv
                 ch.pipeline().addLast("logging", new FilterLogginglHandler());
                 ch.pipeline().addLast("interceptor", interceptorHandler);
                 ch.pipeline().addLast("bizHandler", httpServerHandler);
+                //管道设置处理器
             }
         })
         ;
+        //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
+        //启动服务器(并绑定端口)
         ChannelFuture channelFuture = bootstrap.bind(port).syncUninterruptibly().addListener(future -> {
             String logBanner = "\n\n" +
                     "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n" +
@@ -75,6 +82,7 @@ public class NettyHttpServer implements ApplicationListener<ApplicationStartedEv
                     "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n";
             LOGGER.info(logBanner, port);
         });
+        //对关闭通道进行监听  异步
         channelFuture.channel().closeFuture().addListener(future -> {
             LOGGER.info("Netty Http Server Start Shutdown ............");
             bossGroup.shutdownGracefully();

+ 33 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/sqlite/EndPoint.java

@@ -0,0 +1,33 @@
+package com.gyee.edge.gateway.config.sqlite;
+
+import lombok.Data;
+
+@Data
+public class EndPoint {
+    String ipAddress;
+
+    int ipPort;
+
+    ProtocolType protocolType;
+
+    //串口端口
+    String serialPort;
+
+    //波特率
+    int baudRate;
+
+    //开始位
+    int startBit;
+
+    //停止位
+    int stopBit;
+
+    //校验位
+    int checkBit;
+
+    String username;
+
+    String password;
+
+    String token;
+}

+ 12 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/sqlite/ProtocolType.java

@@ -0,0 +1,12 @@
+package com.gyee.edge.gateway.config.sqlite;
+
+
+//协议类型
+public enum ProtocolType {
+    MODBUS,
+    IEC104,
+    IEC102,
+    GYFP2,
+    CDT,
+    GLYC
+}

+ 0 - 54
gateway/src/main/java/com/gyee/edge/gateway/message/GYMessage.java

@@ -1,54 +0,0 @@
-package com.gyee.edge.gateway.message;
-
-import com.gyee.edge.common.utils.ByteUtil;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import lombok.Data;
-import java.io.*;
-import java.util.List;
-
-@Data
-public class GYMessage implements Serializable {
-    private byte[] header=new byte[]{'G','Y','F','P','V','2','0','0'};
-    private short length;
-    private short count;
-    private int attribute;
-    private List<MessageData> messageDataList;
-    private MessageMutable messageMutable;
-
-
-
-     public byte[] toBytes(){
-         //不指定默认256,指定64以内默认大小都是64.
-         /*如果写入后数据大小未超过512个字节,则选择下一个16的整数倍进行扩容。比如写入数据后大小为12,则扩容后的capacity是16。
-         如果写入后数据大小超过512个字节,则选择下一个2n。比如写入后大小是512字节,则扩容后的capacity是210=1024。(因为29=512,长度已经不够了)
-         扩容不能超过max capacity,否则会报错*/
-         ByteBuf buffer = Unpooled.buffer(10);
-         buffer.writeBytes(getHeader());
-         buffer.writeShort(getLength());
-         buffer.writeShort(getCount());
-         buffer.writeInt(getAttribute());
-         buffer.writeBytes(getMessageMutable().toBytes(getAttribute()));
-         for (MessageData messageData : getMessageDataList()){
-             buffer.writeBytes(messageData.toBytes(getAttribute()));
-         }
-         return buffer.touch().array();
-     }
-
-     public GYMessage toObject(byte[] bytes){
-         GYMessage gyMessage = new GYMessage();
-         gyMessage.setLength(ByteUtil.getShort(bytes,8));
-         gyMessage.setCount(ByteUtil.getShort(bytes,11));
-         gyMessage.setAttribute(ByteUtil.getInt(bytes,12));
-         gyMessage.setMessageMutable(getMessageMutable().toObject(bytes,getAttribute()));
-         gyMessage.setMessageDataList(MessageData.toObject(bytes,getAttribute()));
-         return gyMessage;
-     }
-
-
-
-
-
-
-
-}

+ 0 - 61
gateway/src/main/java/com/gyee/edge/gateway/message/MessageData.java

@@ -1,61 +0,0 @@
-package com.gyee.edge.gateway.message;
-
-import com.gyee.edge.common.utils.ByteUtil;
-import lombok.Data;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.ArrayList;
-import java.util.List;
-
-@Data
-public class MessageData implements Serializable {
-    private int dataKey;
-    private int ts;
-    private byte dataType;
-    private float dataValue;
-
-
-    public byte[] toBytes(int attribute){
-        ByteBuffer byteBuffer = ByteBuffer.allocate(11).order(ByteOrder.BIG_ENDIAN);
-        if(attribute==0){
-            byteBuffer.putFloat(getDataValue());
-        }else if(attribute==1){
-            byteBuffer.putInt(getDataKey());
-            byteBuffer.putFloat(getDataValue());
-        }else if(attribute==2){
-            byteBuffer.putInt(getTs());
-            byteBuffer.putFloat(getDataValue());
-        }else if (attribute==3){
-            byteBuffer.put(getDataType());
-            byteBuffer.putInt(getTs());
-            byteBuffer.putFloat(getDataValue());
-        }return byteBuffer.array();
-    }
-
-    public static List<MessageData> toObject(byte[] bytes, int attribute) {
-        List<MessageData> messageDataList = new ArrayList<>();
-        int count = bytes.length - 25;
-        byte[] bytes1 = new byte[count];
-        System.arraycopy(bytes, 25, bytes1, 0, count);
-        for (int i = 0; i < bytes1.length; i=i+11) {
-            MessageData messageData = new MessageData();
-            if (attribute == 0) {
-                messageData.setDataValue(ByteUtil.getFloat(bytes1,i));
-            } else if (attribute == 1) {
-                messageData.setDataKey(ByteUtil.getInt(bytes1,i));
-                messageData.setDataValue(ByteUtil.getFloat(bytes1,i+4));
-            } else if (attribute == 2) {
-                messageData.setTs(ByteUtil.getInt(bytes1,i));
-                messageData.setDataValue(ByteUtil.getFloat(bytes1,i+4));
-            } else if (attribute == 3) {
-                messageData.setDataType(bytes1[i]);
-                messageData.setTs(ByteUtil.getInt(bytes1,i+1));
-                messageData.setDataValue(ByteUtil.getFloat(bytes1,i+5));
-            }
-            messageDataList.add(messageData);
-        }
-        return messageDataList;
-    }
-}

+ 0 - 52
gateway/src/main/java/com/gyee/edge/gateway/message/MessageMutable.java

@@ -1,52 +0,0 @@
-package com.gyee.edge.gateway.message;
-
-
-import com.gyee.edge.common.utils.ByteUtil;
-import lombok.Data;
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-
-@Data
-public class MessageMutable implements Serializable {
-    private byte messageType;
-    private int key;
-    private int ts;
-
-    public byte[] toBytes(int attribute){
-        ByteBuffer byteBuffer = ByteBuffer.allocate(9).order(ByteOrder.BIG_ENDIAN);
-        if(attribute==0){
-            byteBuffer.putInt(getKey());
-        }else if(attribute==1){
-            byteBuffer.put(getMessageType());
-        }else if (attribute==2){
-            byteBuffer.putInt(getTs());
-        }else if(attribute==3){
-            byteBuffer.put(getMessageType());
-            byteBuffer.putInt(getKey());
-            byteBuffer.putInt(getTs());
-        }
-        byte[] array = byteBuffer.array();
-        return array;
-    }
-
-    public MessageMutable toObject(byte[] bytes,int attribute){
-        byte[] bytes1 = new byte[9];
-        System.arraycopy(bytes,16,bytes1,0,9);
-        MessageMutable messageMutable = new MessageMutable();
-        if(attribute==0){
-            messageMutable.setKey(ByteUtil.getInt(bytes1,0));
-        }else if(attribute==1){
-            messageMutable.setMessageType(bytes1[0]);
-        }else if (attribute==2){
-            messageMutable.setTs(ByteUtil.getInt(bytes1,0));
-        }else if(attribute==3){
-            messageMutable.setMessageType(bytes1[0]);
-            messageMutable.setKey(ByteUtil.getInt(bytes1,1));
-            messageMutable.setTs(ByteUtil.getInt(bytes1,5));
-        }
-        return messageMutable;
-    }
-}

+ 0 - 50
gateway/src/main/java/com/gyee/edge/gateway/message/TestHalp.java

@@ -1,50 +0,0 @@
-package com.gyee.edge.gateway.message;
-import com.google.gson.Gson;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class TestHalp {
-    public static void main(String[] args) {
-        GYMessage gyMessage = new GYMessage();
-        gyMessage.setAttribute(3);
-        gyMessage.setCount((short) 4);
-        gyMessage.setLength(gyMessage.getLength());
-
-        MessageMutable messageMutable = new MessageMutable();
-        messageMutable.setMessageType((byte) 1);
-        messageMutable.setKey(1);
-        messageMutable.setTs(10);
-        gyMessage.setMessageMutable(messageMutable);
-
-        List<MessageData> list = new ArrayList<>();
-        MessageData messageData = new MessageData();
-        messageData.setDataKey(1);
-        messageData.setDataType((byte) 1);
-        messageData.setDataValue(1);
-        messageData.setTs(1);
-
-
-        list.add(messageData);
-        gyMessage.setMessageDataList(list);
-        byte[] bytes = gyMessage.toBytes();
-
-        System.out.println(bytes);
-        GYMessage gyMessage1 = gyMessage.toObject(bytes);
-        System.out.println(gyMessage1);
-
-
-
-//        byte[] objectToByteArray = gyMessage.objectToByteArray(gyMessage);
-//        System.err.println("objectToByteArray: " + objectToByteArray);
-//
-//
-//        Object byteArrayToObject = gyMessage.byteArrayToObject(objectToByteArray);
-//        System.err.println("byteArrayToObject: " + byteArrayToObject.toString());
-//        GYMessage d=(GYMessage)byteArrayToObject;
-//        System.out.println(JSON.toJSONString(d));
-    }
-
-
-}

Разница между файлами не показана из-за своего большого размера
+ 0 - 1455
gateway/src/main/java/com/gyee/edge/gateway/protobuf/UserProto.java


+ 0 - 81
gateway/src/main/java/com/gyee/edge/gateway/protobuf/UserProtoTest.java

@@ -1,81 +0,0 @@
-package com.gyee.edge.gateway.protobuf;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.MessageOrBuilder;
-import com.google.protobuf.TextFormat;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-@Slf4j
-public class UserProtoTest {
-//    public static void main(String[] args) {
-//        UserProto.User.Builder user = UserProto.User.newBuilder();
-//        System.out.println("测试两千个点数据:");
-//        Map<String,Double> map = new HashMap<>();
-//        for(int i = 1;i<=2000;i++){
-//            map.put(i+"",Math.random());
-//        }
-//        user.putAllMap(map);
-//        Date ST=new Date();
-//        UserProto.User build1 = user.build();
-//        byte[] s1 = build1.toByteArray();
-//        Date ET=new Date();
-//        System.out.println("序列化时间:"+(ET.getTime()-ST.getTime()));
-//        System.out.println("protobuf序列化大小: " + s1.length);
-//        UserProto.User user1 = null;
-//        try {
-//            //反序列化
-//            Date date1 = new Date();
-//            user1 = UserProto.User.parseFrom(s1);
-//            Date date2 = new Date();
-//            System.out.println("反序列化时间"+(date2.getTime()-date1.getTime()));
-//        } catch (InvalidProtocolBufferException e) {
-//            e.printStackTrace();
-//        }
-//
-//        System.out.println("--------------------");
-//
-//        System.out.println("测试两千个点一半数据空值测试:");
-//        user.clear();
-//        map.clear();
-//        for(int j = 1;j<=2000;j++){
-//            if (j<=1000){
-//                map.put(j+"",Math.random());
-//            }else {
-//                map.put(j+"",null);
-//            }
-//        }
-//        user.putAllMap(map);
-//        Date ST1=new Date();
-//        UserProto.User build2 = user.build();
-//        byte[] s2 = build2.toByteArray();
-//        Date ET2=new Date();
-//        System.out.println("序列化时间:"+(ET2.getTime()-ST1.getTime()));
-//        System.out.println("protobuf一半数据空值序列化大小: " + s2.length);
-//        UserProto.User users2 = null;
-//        try {
-//            //反序列化
-//            Date date3 = new Date();
-//            users2 = UserProto.User.parseFrom(s2);
-//            Date date4 = new Date();
-//            System.out.println("反序列化时间"+(date4.getTime()-date3.getTime()));
-//        } catch (InvalidProtocolBufferException e) {
-//            e.printStackTrace();
-//        }
-//    }
-
-    /**
-     * 处理反序列化时中文出现的八进制问题(属性值为中文时可能会出现这样的八进制\346\223\215\344\275\234\345\221\230)
-     * 可直接使用 protobuf 自带的 TextFormat.printToUnicodeString(message) 方法,但是这个方法过时了,直接从这个方法内部拿出来使用就可以了
-     *
-     * @param message 转换的 protobuf 对象
-     * @return string
-     */
-    public static String printToUnicodeString(MessageOrBuilder message) {
-        return TextFormat.printer().escapingNonAscii(false).printToString(message);
-    }
-
-}

+ 0 - 24
gateway/src/main/java/com/gyee/edge/gateway/protobuf/user.proto

@@ -1,24 +0,0 @@
-//使用 proto3 语法 ,未指定则使用proto2
-syntax = "proto3";
-
-// proto 文件包名
-package com.gyee.edge.gateway.protobuf;
-
-//生成 proto 文件所在包路径,一般来说是和文件包名一致就可以
-option java_package = "com.gyee.edge.gateway.protobuf";
-
-//生成 proto 的文件名
-option java_outer_classname="UserProto";
-
-
-//创建一个 User 对象
-message User{
-  // 定义简单的 Map string
-  map<string, Datas> map = 8;
-
-  message Datas {
-    int64 ts = 1;
-    double value = 2;
-  }
-}
-

+ 1 - 0
gateway/src/main/java/com/gyee/edge/gateway/restful/iohandler/HttpServerHandler.java

@@ -49,6 +49,7 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
         return thread;
     });
 
+    //返回客户端消息调用的方法
     @Override
     public void channelReadComplete(ChannelHandlerContext ctx) {
         ctx.flush();

+ 2 - 0
gateway/src/main/resources/application.yaml

@@ -10,5 +10,7 @@ spring:
     username:
     password:
 
+rocketmq:
+  namesrvaddr: 127.0.0.1:9876
 filePath: D:\\data.csv
 

+ 2 - 0
loader/build.gradle

@@ -15,6 +15,7 @@ apply plugin: "$bootGroup"
 apply plugin: 'io.spring.dependency-management'
 dependencies {
     implementation project(":common:utils")
+    implementation project(":protocol:gyfp2")
     implementation("$bootGroup:spring-boot-starter:$springBootVersion")
     implementation("com.google.code.gson:gson:$gsonVersion")
     implementation("org.apache.logging.log4j:log4j-core:$log4jVersion")
@@ -25,4 +26,5 @@ dependencies {
     implementation("com.google.protobuf:protobuf-java:$protoBuf")
     implementation("io.github.openfeign:feign-core:$openFeignVersion")
     implementation("io.github.openfeign:feign-jackson:$openFeignVersion")
+    implementation("org.xerial:sqlite-jdbc:$sqliteJdbc")
 }

+ 4 - 4
loader/src/main/java/com/gyee/edge/loader/ApplicationConsumerEventListener.java

@@ -1,6 +1,6 @@
 package com.gyee.edge.loader;
 
-import com.gyee.edge.loader.golden.DataWrite;
+import com.gyee.edge.loader.adapter.DataWrite;
 import com.gyee.edge.loader.rocketmq.comuser.BalanceComuser;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -15,15 +15,15 @@ public class ApplicationConsumerEventListener implements
         ApplicationListener<ApplicationReadyEvent> {
 
     @Autowired
-    private DataWrite dataWrite;
+    private BalanceComuser balanceComuser;
+
+
 
     @SneakyThrows
     @Override
     public void onApplicationEvent(ApplicationReadyEvent event) {
         log.info("ApplicationEvent  rised!");
         log.info("listener: " + event.toString());
-        BalanceComuser balanceComuser = new BalanceComuser();
-        balanceComuser.setWriteGolden(dataWrite);
         balanceComuser.DataConsumer();
     }
 

+ 237 - 0
loader/src/main/java/com/gyee/edge/loader/adapter/DataWrite.java

@@ -0,0 +1,237 @@
+package com.gyee.edge.loader.adapter;
+
+
+import com.gyee.edge.loader.data.GeneralTsData;
+import com.gyee.edge.loader.data.TsPointData;
+import com.gyee.edge.loader.mapper.DBPointMapper;
+
+import com.gyee.edge.loader.service.SqlService;
+
+import com.gyee.edge.loader.sqlite.ReadDBPointData;
+import com.gyee.protocol.gyfp2.message.GYMessage;
+import com.gyee.protocol.gyfp2.message.MessageData;
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+
+/**
+ * @author :
+ * @date :Created in 2022/8/25 17:34
+ * @description:  写数据
+ */
+
+@Slf4j
+@Component
+public class DataWrite {
+    @Autowired
+    private SqlService sqlService;
+    
+    @Autowired
+    private ReadDBPointData readDBPointData;
+
+    public void DataWriteGloden(GYMessage gyMessage) throws Exception {
+        List<TsPointData> tsDataList = new ArrayList<>();
+        List<MessageData> messageDataList = gyMessage.getMessageDataList();
+        try {
+            if(gyMessage.getAttribute()==0){
+                for(MessageData messageData:messageDataList){
+                    TsPointData tsPointData = new TsPointData();
+                    GeneralTsData generalTsData = new GeneralTsData();
+                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(messageData.getDataKey());
+                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
+                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        generalTsData.setTs(messageData.getTs());
+                        if (messageData.getDataType() == 0) {
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                        } else if (messageData.getDataType() == 10) {
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                        } else if (messageData.getDataType() == 8) {
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                        } else if (messageData.getDataType() == 11) {
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                        }
+                        generalTsData.setStatus((short) 1);
+                        tsPointData.setTsData(generalTsData);
+                        tsDataList.add(tsPointData);
+                    }
+                }
+            }else if(gyMessage.getAttribute()==1){
+                for(int i = 0; i < gyMessage.getMessageDataList().size();i++){
+                    TsPointData tsPointData = new TsPointData();
+                    GeneralTsData generalTsData = new GeneralTsData();
+                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(gyMessage.getMessageMutable().getKey() + i);
+                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
+                        MessageData messageData = gyMessage.getMessageDataList().get(i);
+                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        generalTsData.setTs(messageData.getTs());
+                        if (messageData.getDataType() == 0) {
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                        } else if (messageData.getDataType() == 10) {
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                        } else if (messageData.getDataType() == 8) {
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                        } else if (messageData.getDataType() == 11) {
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                        }
+                        generalTsData.setStatus((short) 1);
+                        tsPointData.setTsData(generalTsData);
+                        tsDataList.add(tsPointData);
+                    }
+                }
+            }else if(gyMessage.getAttribute()==2){
+                byte messageType = gyMessage.getMessageMutable().getMessageType();
+                for(MessageData messageData:messageDataList){
+                    TsPointData tsPointData = new TsPointData();
+                    GeneralTsData generalTsData = new GeneralTsData();
+                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(messageData.getDataKey());
+                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
+                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        generalTsData.setTs(messageData.getTs());
+                        if (messageType == 0) {
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                        } else if (messageType == 10) {
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                        } else if (messageType == 8) {
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                        } else if (messageType == 11) {
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                        }
+                        generalTsData.setStatus((short) 1);
+                        tsPointData.setTsData(generalTsData);
+                        tsDataList.add(tsPointData);
+                    }
+                }
+            }else if(gyMessage.getAttribute()==3){
+                byte messageType = gyMessage.getMessageMutable().getMessageType();
+                for(int i = 0; i < gyMessage.getMessageDataList().size();i++){
+                    TsPointData tsPointData = new TsPointData();
+                    GeneralTsData generalTsData = new GeneralTsData();
+                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(gyMessage.getMessageMutable().getKey()+ i);
+                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
+                        MessageData messageData = gyMessage.getMessageDataList().get(i);
+                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        generalTsData.setTs(messageData.getTs());
+                        if (messageType == 0) {
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                        } else if (messageType == 10) {
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                        } else if (messageType == 8) {
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                        } else if (messageType == 11) {
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                        }
+                        generalTsData.setStatus((short) 1);
+                        tsPointData.setTsData(generalTsData);
+                        tsDataList.add(tsPointData);
+                    }
+                }
+            }else if(gyMessage.getAttribute()==4){
+                long ts = gyMessage.getMessageMutable().getTs();
+                for(MessageData messageData:messageDataList){
+                    TsPointData tsPointData = new TsPointData();
+                    GeneralTsData generalTsData = new GeneralTsData();
+                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(messageData.getDataKey());
+                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
+                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        generalTsData.setTs(ts);
+                        if (messageData.getDataType() == 0) {
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                        } else if (messageData.getDataType() == 10) {
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                        } else if (messageData.getDataType() == 8) {
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                        } else if (messageData.getDataType() == 11) {
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                        }
+                        generalTsData.setStatus((short) 1);
+                        tsPointData.setTsData(generalTsData);
+                        tsDataList.add(tsPointData);
+                    }
+                }
+            }else if(gyMessage.getAttribute()==5){
+                long ts = gyMessage.getMessageMutable().getTs();
+                for(int i = 0; i < gyMessage.getMessageDataList().size();i++){
+                    TsPointData tsPointData = new TsPointData();
+                    GeneralTsData generalTsData = new GeneralTsData();
+                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(gyMessage.getMessageMutable().getKey() + i);
+                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
+                        MessageData messageData = gyMessage.getMessageDataList().get(i);
+                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        generalTsData.setTs(ts);
+                        if (messageData.getDataType() == 0) {
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                        } else if (messageData.getDataType() == 10) {
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                        } else if (messageData.getDataType() == 8) {
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                        } else if (messageData.getDataType() == 11) {
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                        }
+                        generalTsData.setStatus((short) 1);
+                        tsPointData.setTsData(generalTsData);
+                        tsDataList.add(tsPointData);
+                    }
+                }
+            }else if(gyMessage.getAttribute()==6){
+                long ts = gyMessage.getMessageMutable().getTs();
+                byte messageType = gyMessage.getMessageMutable().getMessageType();
+                for(MessageData messageData:messageDataList){
+                    TsPointData tsPointData = new TsPointData();
+                    GeneralTsData generalTsData = new GeneralTsData();
+                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(messageData.getDataKey());
+                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
+                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        generalTsData.setTs(ts);
+                        if (messageType == 0) {
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                        } else if (messageType == 10) {
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                        } else if (messageType == 8) {
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                        } else if (messageType == 11) {
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                        }
+                        generalTsData.setStatus((short) 1);
+                        tsPointData.setTsData(generalTsData);
+                        tsDataList.add(tsPointData);
+                    }
+                }
+            }else if(gyMessage.getAttribute()==7){
+                long ts = gyMessage.getMessageMutable().getTs();
+                byte messageType = gyMessage.getMessageMutable().getMessageType();
+                for(int i = 0;i < gyMessage.getMessageDataList().size();i++){
+                    TsPointData tsPointData = new TsPointData();
+                    GeneralTsData generalTsData = new GeneralTsData();
+                    MessageData messageData = gyMessage.getMessageDataList().get(i);
+                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(gyMessage.getMessageMutable().getKey() + i);
+                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0){
+                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        generalTsData.setTs(ts);
+                        if(messageType == 0){
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                        }else if(messageType == 10){
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                        }else if(messageType == 8){
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                        }else if(messageType == 11){
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                        }
+                        generalTsData.setStatus((short) 1);
+                        tsPointData.setTsData(generalTsData);
+                        tsDataList.add(tsPointData);
+                    }
+                }
+            }
+            Date sTdate = new Date();
+            sqlService.saveDataSnaps(tsDataList);
+            Date enDate = new Date();
+            log.info("写入golden时间"+(enDate.getTime()-sTdate.getTime()));
+        }catch (Exception e){
+            log.error("datawrite发生异常",e);
+        }
+    }
+
+}

+ 0 - 11
loader/src/main/java/com/gyee/edge/loader/data/Datas.java

@@ -1,11 +0,0 @@
-package com.gyee.edge.loader.data;
-
-import lombok.Data;
-
-
-@Data
-public class Datas {
-    private long ts;
-    private double value;
-
-}

+ 8 - 14
loader/src/main/java/com/gyee/edge/loader/data/GeneralTsData.java

@@ -1,26 +1,20 @@
 package com.gyee.edge.loader.data;
 
-import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
 
-import java.util.Optional;
+import java.io.Serializable;
+
 
-/**
- * @author songwb<songwb@aliyun.com>
- */
 @Data
-@AllArgsConstructor
-@NoArgsConstructor
-public class GeneralTsData implements TsData {
+public class GeneralTsData implements Serializable {
 
     private long ts;
     private short status;
-    private Optional<Double> doubleValue;
-    private Optional<Long> longValue;
-    private Optional<Boolean> booleanValue;
-    private Optional<String> stringValue;
-    private Optional<String> blobValue;
+    private Double doubleValue;
+    private Long longValue;
+    private Boolean booleanValue;
+    private Float floatValue;
+
 
 
 }

+ 0 - 15
loader/src/main/java/com/gyee/edge/loader/data/TsData.java

@@ -1,15 +0,0 @@
-package com.gyee.edge.loader.data;
-
-/**
- * @author songwb<songwb@aliyun.com>
- */
-public interface TsData {
-
-    long getTs();
-
-    short getStatus();
-
-    //double getValue();
-
-}
-

+ 0 - 15
loader/src/main/java/com/gyee/edge/loader/data/TsDataType.java

@@ -1,15 +0,0 @@
-package com.gyee.edge.loader.data;
-
-/**
- * @author songwb<songwb@aliyun.com>
- */
-public enum TsDataType {
-    LONG,
-    DOUBLE,
-    BOOLEAN,
-    STRING,
-    BLOB,
-    COORDINATE
-
-}
-

+ 8 - 13
loader/src/main/java/com/gyee/edge/loader/data/TsPointData.java

@@ -1,28 +1,23 @@
 package com.gyee.edge.loader.data;
 
 
-import lombok.AllArgsConstructor;
+
 import lombok.Data;
-import lombok.NoArgsConstructor;
+
 
 import java.io.Serializable;
-import java.util.Optional;
 
-/**
- * @author songwb<songwb@aliyun.com>
- */
+
 @Data
-@AllArgsConstructor
-@NoArgsConstructor
 public class TsPointData implements Serializable {
 
     private String tagName;
     private GeneralTsData tsData;
 
-    public TsDataType findDataType() {
-//        if (tsData.getDoubleValue().isPresent()){
-//            return TsDataType.DOUBLE;
-//        }
+    /*public TsDataType findDataType() {
+        if (tsData.getDoubleValue().isPresent()){
+            return TsDataType.DOUBLE;
+        }
          if (tsData.getBooleanValue().isPresent()){
             return TsDataType.BOOLEAN;
         }
@@ -40,7 +35,7 @@ public class TsPointData implements Serializable {
 
 
         return TsDataType.DOUBLE;
-    }
+    }*/
     /**
      * 获取 double 类型值
      * @return

+ 0 - 13
loader/src/main/java/com/gyee/edge/loader/data/User.java

@@ -1,13 +0,0 @@
-package com.gyee.edge.loader.data;
-
-import lombok.Data;
-
-import java.util.List;
-import java.util.Map;
-
-
-@Data
-public class User {
-    private Map<String,Datas> map;
-
-}

+ 0 - 53
loader/src/main/java/com/gyee/edge/loader/golden/DataWrite.java

@@ -1,53 +0,0 @@
-package com.gyee.edge.loader.golden;
-
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.gyee.edge.loader.data.Datas;
-import com.gyee.edge.loader.data.GeneralTsData;
-import com.gyee.edge.loader.data.TsPointData;
-import com.gyee.edge.loader.data.User;
-import com.gyee.edge.loader.service.SqlService;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.*;
-
-@Slf4j
-@Component
-public class DataWrite {
-    @Autowired
-    private SqlService sqlService;
-
-    public void DataWriteGloden(String jsonObject) throws Exception {
-        List<TsPointData> tsDataList = new ArrayList<>();
-        try {
-            Date ts = new Date();
-            System.out.println("写入golden前时间"+(ts.getTime()));
-            //通过 proto Json 数据转 Java 对象
-            Gson GSON = new GsonBuilder().serializeNulls().create();
-            User user3 = GSON.fromJson(jsonObject, User.class);
-            Map<String, Datas> maps =  user3.getMap();
-            for (Map.Entry<String, Datas> entry:maps.entrySet()){
-                TsPointData tsPointData = new TsPointData();
-                GeneralTsData generalTsData = new GeneralTsData();
-                tsPointData.setTagName(entry.getKey());
-                generalTsData.setTs(entry.getValue().getTs());
-                double value = entry.getValue().getValue();
-                generalTsData.setDoubleValue(Optional.ofNullable(Double.valueOf(value)));
-                tsPointData.setTsData(generalTsData);
-                tsDataList.add(tsPointData);
-            }
-            sqlService.saveDataSnaps(tsDataList);
-            Date es = new Date();
-            System.out.println("写入golden后时间"+(ts.getTime()));
-            log.info("写入golden时间"+(es.getTime()-ts.getTime()));
-        }catch (Exception e){
-            log.error("转化map发生异常",e);
-        }
-    }
-
-}

+ 21 - 0
loader/src/main/java/com/gyee/edge/loader/mapper/DBPointMapper.java

@@ -0,0 +1,21 @@
+package com.gyee.edge.loader.mapper;
+
+import lombok.Data;
+
+/**
+ * @author :
+ * @date :Created in 2022/8/25 17:34
+ * @description:  gyfp2协议与写入数据配置实体类
+ */
+
+@Data
+public class DBPointMapper {
+    private int gyfp2_address;
+    private int gyfp2_valid;
+    private float gyfp2_coeff;
+    private float gyfp2_base;
+    private int point_id;
+    private int point_type;
+    private String point_key;
+    private int point_ratio;
+}

Разница между файлами не показана из-за своего большого размера
+ 0 - 1455
loader/src/main/java/com/gyee/edge/loader/protobuf/UserProto.java


+ 0 - 24
loader/src/main/java/com/gyee/edge/loader/protobuf/user.proto

@@ -1,24 +0,0 @@
-//使用 proto3 语法 ,未指定则使用proto2
-syntax = "proto3";
-
-// proto 文件包名
-package com.gyee.edge.gateway.protobuf;
-
-//生成 proto 文件所在包路径,一般来说是和文件包名一致就可以
-option java_package = "com.gyee.edge.gateway.protobuf";
-
-//生成 proto 的文件名
-option java_outer_classname="UserProto";
-
-
-//创建一个 User 对象
-message User{
-  // 定义简单的 Map string
-  map<string, Datas> map = 8;
-
-  message Datas {
-    int64 ts = 1;
-    double value = 2;
-  }
-}
-

+ 44 - 42
loader/src/main/java/com/gyee/edge/loader/rocketmq/comuser/BalanceComuser.java

@@ -1,9 +1,9 @@
 package com.gyee.edge.loader.rocketmq.comuser;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-import com.gyee.edge.common.utils.util.ProtoJsonUtil;
-import com.gyee.edge.loader.golden.DataWrite;
-import com.gyee.edge.loader.protobuf.UserProto;
+import com.gyee.edge.loader.adapter.DataWrite;
+
+import com.gyee.protocol.gyfp2.message.GYMessage;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -11,6 +11,8 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import java.util.Date;
@@ -20,48 +22,48 @@ import java.util.List;
 @Component
 public class BalanceComuser {
 
+    @Value("${rocketmq.namesrvaddr}")
+    String nameSrvAddr;
+
+    @Autowired
     private DataWrite dataWrite;
-        public void DataConsumer() throws Exception {
-            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_sonsumer");
-            consumer.setNamesrvAddr("127.0.0.1:9876");
-            consumer.subscribe("keyMessage", "*");
-            consumer.setMessageModel(MessageModel.CLUSTERING);
-            consumer.registerMessageListener(new MessageListenerConcurrently() {
-                @Override
-                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
-                    try {
-                        for (MessageExt msg : msgs) {
-                            String topic = msg.getTopic();
-                            //String msgBody = new String(msg.getBody(), "utf-8");
-                            String tags = msg.getTags();
-                            try {
-                                //反序列化
-                                Date st = new Date();
-                                System.out.println("反序列化前时间"+(st.getTime()));
-                                UserProto.User user1 = UserProto.User.parseFrom(msg.getBody());
-                                String jsonObject = ProtoJsonUtil.toJson(user1);
-                                log.info("收到消息" + "topic" + topic + ",tags" + tags + ",msg" + user1);
-                                //数据写入
-                                Date et = new Date();
-                                System.out.println("反序列化后时间"+(et.getTime()));
-                                System.out.println("序列化时间"+(et.getTime()-st.getTime()));
-                                dataWrite.DataWriteGloden(jsonObject);
-                            } catch (InvalidProtocolBufferException e) {
-                                e.printStackTrace();
-                            }
+
+    public void DataConsumer() throws Exception {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_sonsumer");
+        consumer.setNamesrvAddr(nameSrvAddr);
+        consumer.subscribe("keyMessage", "*");
+        consumer.setMessageModel(MessageModel.CLUSTERING);
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+                try {
+                    for (MessageExt msg : msgs) {
+                        String topic = msg.getTopic();
+                        String msgBody = new String(msg.getBody(), "utf-8");
+                        String tags = msg.getTags();
+                        try {
+                            //反序列化
+                            byte[] bytes = msg.getBody();
+                            Date sTdate = new Date();
+                            GYMessage gyMessage = GYMessage.toObject(bytes);
+                            Date eNdate = new Date();
+                            log.info("反序列化时间"+(eNdate.getTime()-sTdate.getTime()));
+                            log.info("收到消息" + "topic" + topic + ",tags" + tags + ",msg" + gyMessage);
+                            //数据写入
+                            dataWrite.DataWriteGloden(gyMessage);
+                        } catch (InvalidProtocolBufferException e) {
+                            e.printStackTrace();
                         }
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                     }
-                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                 }
-            });
-            consumer.start();  //启动消费者
-            log.info("Consumer STARTTED");
-        }
-
-    public void setWriteGolden(DataWrite dataWrite) {
-        this.dataWrite = dataWrite;
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();  //启动消费者
+        log.info("Consumer STARTTED");
     }
+
 }

+ 2 - 3
loader/src/main/java/com/gyee/edge/loader/service/WriteDataService.java

@@ -1,7 +1,6 @@
 package com.gyee.edge.loader.service;
 
 
-import com.gyee.edge.loader.data.TsData;
 import com.gyee.edge.loader.data.TsPointData;
 import feign.Headers;
 import feign.Param;
@@ -17,6 +16,6 @@ public interface WriteDataService {
     void saveTspointDatas(List<TsPointData> tsDataList);
 
 
-    @RequestLine("GET /latest?keys={keys}")
-    Map<String, TsData> getData(@Param(value = "keys")String tagName);
+    /*@RequestLine("GET /latest?keys={keys}")
+    Map<String, TsData> getData(@Param(value = "keys")String tagName);*/
 }

+ 74 - 0
loader/src/main/java/com/gyee/edge/loader/sqlite/ReadDBPointData.java

@@ -0,0 +1,74 @@
+package com.gyee.edge.loader.sqlite;
+
+
+import com.gyee.edge.loader.mapper.DBPointMapper;
+import com.gyee.edge.loader.sqlite.config.Database;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@Component
+public class ReadDBPointData {
+    @Autowired
+    private Database db;
+
+    public List<DBPointMapper> selectAllData() {
+        List<DBPointMapper> dbPointMappers = new ArrayList<>();
+        try {
+            Connection conn = db.getConnection();
+            String sql = "select * from dbpointmapper";
+            PreparedStatement ps = conn.prepareStatement(sql);
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                DBPointMapper dbPointMapper = new DBPointMapper();
+                dbPointMapper.setGyfp2_address(rs.getInt("gyfp2_address"));
+                dbPointMapper.setGyfp2_valid(rs.getInt("gyfp2_valid"));
+                dbPointMapper.setGyfp2_coeff(rs.getFloat("gyfp2_coeff"));
+                dbPointMapper.setGyfp2_base(rs.getFloat("gyfp2_base"));
+                dbPointMapper.setPoint_id(rs.getInt("point_id"));
+                dbPointMapper.setPoint_type(rs.getInt("point_type"));
+                dbPointMapper.setPoint_key(rs.getString("point_key"));
+                dbPointMapper.setPoint_ratio(rs.getInt("point_ratio"));
+                dbPointMappers.add(dbPointMapper);
+            }
+            db.releaseResource();
+        } catch (Exception e) {
+            log.info("sqlite查询失败",e);
+            e.printStackTrace();
+        }
+        return dbPointMappers;
+    }
+
+    public DBPointMapper selectBykey(int gyfp2_address) {
+        DBPointMapper dbPointMapper = new DBPointMapper();
+        try {
+            Connection conn = db.getConnection();
+            String sql = "select * from dbpointmapper where gyfp2_address = ?";
+            PreparedStatement ps = conn.prepareStatement(sql);
+            ps.setInt(1,gyfp2_address);
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                dbPointMapper.setGyfp2_address(rs.getInt("gyfp2_address"));
+                dbPointMapper.setGyfp2_valid(rs.getInt("gyfp2_valid"));
+                dbPointMapper.setGyfp2_coeff(rs.getFloat("gyfp2_coeff"));
+                dbPointMapper.setGyfp2_base(rs.getFloat("gyfp2_base"));
+                dbPointMapper.setPoint_id(rs.getInt("point_id"));
+                dbPointMapper.setPoint_type(rs.getInt("point_type"));
+                dbPointMapper.setPoint_key(rs.getString("point_key"));
+                dbPointMapper.setPoint_ratio(rs.getInt("point_ratio"));
+            }
+            db.releaseResource();
+        } catch (Exception e) {
+            log.info("sqlite查询失败",e);
+            e.printStackTrace();
+        }
+        return dbPointMapper;
+    }
+}

+ 364 - 0
loader/src/main/java/com/gyee/edge/loader/sqlite/config/DB.java

@@ -0,0 +1,364 @@
+package com.gyee.edge.loader.sqlite.config;
+
+import java.io.Reader;
+import java.io.StringReader;
+import java.sql.*;
+import java.text.MessageFormat;
+import java.util.*;
+
+public class DB {
+    //数据库驱动,默认为Oracle
+    public static String JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver";
+
+    //数据库连接地址
+    public static String DB_URL = "";
+
+    public static String USER = "";
+
+    public static String PASS = "";
+
+    Connection CONN = null;
+
+    //PreparedStatement PSTM = null;
+
+    public DB(String url, String user, String pass) {
+        DB_URL = url;
+        USER = user;
+        PASS = pass;
+    }
+
+    public DB() {
+        /*Properties props = new Properties();
+        try {
+            String configFilePath = System.getProperty("antgis.web.oa.root")+"/WEB-INF/config.properties";
+            props.load(new FileInputStream(configFilePath));
+        }catch (FileNotFoundException e){
+
+        }catch (IOException e){
+
+        }*/
+
+
+//        JDBC_DRIVER = Config.DB_DRIVER;
+//        DB_URL = Config.DB_URL;
+//        USER = Config.DB_USER;
+//        PASS = Config.DB_PWD;
+    }
+
+    public Connection getConnection() {
+        try {
+            Class.forName(JDBC_DRIVER);
+            CONN = DriverManager.getConnection(DB_URL, USER, PASS);
+        } catch (ClassNotFoundException e) {
+
+        } catch (SQLException e) {
+
+        }
+        return CONN;
+    }
+
+    public ResultSet getResultSet(String sql) {
+        ResultSet resultSet = null;
+        //PreparedStatement pstm = null;
+        try {
+            CONN = getConnection();
+            PreparedStatement PSTM = CONN.prepareStatement(sql);
+            resultSet = PSTM.executeQuery();
+            return resultSet;
+        } catch (SQLException e) {
+            return resultSet;
+        } finally {
+
+        }
+    }
+
+    public void close() {
+        /*if(PSTM !=null) {
+            try {
+                PSTM.close();
+            } catch (SQLException e) {
+                e.printStackTrace();
+            }
+        }*/
+
+        try {
+            if (CONN != null && (!CONN.isClosed())) {
+                try {
+                    CONN.close();
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                }
+            }
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public List<Map<String, Object>> getList(String sql) {
+        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
+        ResultSet rs = getResultSet(sql);
+        list = convertToList(rs);
+        return list;
+    }
+
+    public List<Map<String, Object>> convertToList(ResultSet rs) {
+        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
+        try {
+            ResultSetMetaData md = rs.getMetaData(); //获得结果集结构信息,元数据
+            int columnCount = md.getColumnCount();   //获得列数
+            while (rs.next()) {
+                Map<String, Object> rowData = new HashMap<String, Object>();
+                for (int i = 1; i <= columnCount; i++) {
+                    rowData.put(md.getColumnName(i), rs.getObject(i));
+                }
+                list.add(rowData);
+            }
+        } catch (SQLException e) {
+
+        }
+
+        return list;
+    }
+
+    public ResultSet getResultSet(String tableName, String filter) {
+        String sql = MessageFormat.format("select * from {0} where {1}", tableName, filter);
+        return getResultSet(sql);
+    }
+
+    public ResultSet getResultSet(String tableName, String columnList, String filter) {
+        String sql = MessageFormat.format("select {2} from {0} where {1}", tableName, filter, columnList);
+        return getResultSet(sql);
+    }
+
+    public ResultSet getResultSet(String tableName, String columnList, String filter, String orderColumnList) {
+        String sql = MessageFormat.format("select {2} from {0} where {1} order by {3}", tableName, filter, columnList, orderColumnList);
+        return getResultSet(sql);
+    }
+
+    public Object getFirstColumn(String sql) {
+        ResultSet rs = getResultSet(sql);
+        try {
+            if (rs.next()) {
+                return rs.getObject(0);
+            }
+        } catch (SQLException e) {
+
+        }
+        return null;
+    }
+
+    public String getSqlForPageSize(String tableName, Integer pageSize, Integer pageIndex, String where, String order) {
+        return getSqlForPageSize(tableName, "*", pageSize, pageIndex, where, order);
+    }
+
+    public String getSqlForPageSize(String tableName, String sColumnList, int pageSize, int pageIndex, String where, String order) {
+        String sSql = "";
+        StringBuilder sb = new StringBuilder();
+        sb.append("select " + sColumnList + " from (select t.*,rownum rno ");
+        sb.append(" from (select * from " + tableName + " ");
+        sb.append(" where " + where);
+        sb.append(order);
+        sb.append(") t where rownum <= ");
+        sb.append((pageIndex + 1) * pageSize);
+        sb.append(" )");
+        sb.append(" where rno >= ");
+        sb.append(pageIndex * pageSize + 1);
+        sSql = sb.toString();
+        return sSql;
+    }
+
+    public boolean judgeRecordExist(String tableName, String filter) {
+        ResultSet rs = getResultSet(tableName, filter);
+        try {
+            if (rs.next())
+                return true;
+            else
+                return false;
+        } catch (SQLException e) {
+            return false;
+        }
+    }
+
+    public String formatTableName(String tableName) {
+        return tableName.replace("\"", "");
+    }
+
+    public boolean judgeTableOrViewExist(String tableName) {
+        String sql = "select count(*) from user_objects where object_type in ('TABLE','VIEW') AND upper(OBJECT_NAME)=upper('" + formatTableName(tableName) + "')";
+        ResultSet rs = getResultSet(sql);
+        return judgeCountValue(rs);
+    }
+
+    public boolean judgeColumnExist(String tableName, String columnName) {
+        String sql = "select count(*) from user_tab_cols where upper(table_name)=upper('" + formatTableName(tableName) + "') and upper(COLUMN_NAME)=upper('" + columnName + "') order by COLUMN_ID";
+        ResultSet rs = getResultSet(sql);
+        return judgeCountValue(rs);
+    }
+
+    private boolean judgeCountValue(ResultSet rs) {
+        try {
+            if (rs.next()) {
+                if (Integer.parseInt(rs.getObject(0).toString()) == 1)
+                    return true;
+                else
+                    return false;
+            } else {
+                return false;
+            }
+        } catch (SQLException e) {
+            return false;
+        }
+    }
+
+    public HashMap getTableColumnType(String tableName) {
+        String sql = "select COLUMN_NAME,DATA_TYPE,DATA_LENGTH,COLUMN_ID from user_tab_cols where upper(table_name)=upper('" + formatTableName(tableName) + "') order by COLUMN_ID";
+        ResultSet rs = getResultSet(sql);
+        HashMap hm = new HashMap();
+        try {
+            while (rs.next()) {
+                String name = rs.getString("COLUMN_NAME");
+                String value = rs.getString("DATA_TYPE");
+                hm.put(name, value);
+            }
+        } catch (SQLException e) {
+
+        }
+
+        return hm;
+    }
+
+    public boolean add(String tableName, LinkedHashMap<String, Object> hm) throws Exception {
+        if (tableName == null || tableName.equals("")) {
+            throw new Exception("表名不能为空字符串!");
+        }
+
+        String sColumns = "";
+        String sValues = "";
+
+        Iterator iter = hm.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry entry = (Map.Entry) iter.next();
+            sColumns += entry.getKey().toString() + ",";
+            sValues += "?,";
+        }
+        /*for(String key:hm.keySet()){
+            sColumns += key +",";
+            sValues += "?,";
+        }*/
+
+        sColumns = sColumns.substring(0, sColumns.length() - 1);
+        sValues = sValues.substring(0, sValues.length() - 1);
+        String sql = MessageFormat.format("insert into {0} ({1}) values({2})", tableName, sColumns, sValues);
+
+        return executeSql(sql, tableName, hm);
+    }
+
+    public boolean update(String tableName, LinkedHashMap<String, Object> hm, String filterColumns) throws Exception {
+        if (tableName == null || tableName.equals("")) {
+            throw new Exception("表名不能为空字符串!");
+        }
+
+        String filter = getFilterString(tableName, hm, filterColumns);
+
+        return update2(tableName, hm, filter);
+    }
+
+    public boolean update2(String tableName, LinkedHashMap<String, Object> hm, String filter) throws Exception {
+        if (tableName == null || tableName.equals("")) {
+            throw new Exception("表名不能为空字符串!");
+        }
+
+        String str = "";
+        Iterator iter = hm.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry entry = (Map.Entry) iter.next();
+            str += entry.getKey().toString() + "=?,";
+        }
+
+        str = str.substring(0, str.length() - 1);
+        String sql = MessageFormat.format("update {0} set {1} where {2}", tableName, str, filter);
+        return executeSql(sql, tableName, hm);
+    }
+
+    private String getFilterString(String tableName, LinkedHashMap<String, Object> hm, String filterColumns) {
+        String[] arr = filterColumns.split(",");
+        int length = arr.length;
+        HashMap columnsType = getTableColumnType(tableName);
+        String filter = "";
+        for (int ii = 0; ii < length; ii++) {
+            String columnType = columnsType.get(arr[ii]).toString();
+            switch (columnType) {
+                case "NUMBER":
+                    filter += "and " + arr[ii] + "=" + hm.get(arr[ii]) + " ";
+                    break;
+                case "DATE":
+                    filter += "and " + arr[ii] + "=to_date('" + hm.get(arr[ii]).toString() + "','yyyy-mm-dd hh24:mi:ss') ";
+                    break;
+                default:
+                    filter += "and " + arr[ii] + "='" + hm.get(arr[ii]).toString() + "' ";
+                    break;
+
+            }
+        }
+        filter = filter.substring(3);
+        return filter;
+    }
+
+
+    public boolean executeSql(String sql, String tableName, LinkedHashMap<String, Object> hm) {
+        boolean bl = false;
+        int i = 0;
+        try {
+            CONN = getConnection();
+            PreparedStatement pstm = CONN.prepareStatement(sql);
+            HashMap columnsType = getTableColumnType(tableName);
+
+            Iterator iter = hm.entrySet().iterator();
+            int index = 0;
+            while (iter.hasNext()) {
+                index += 1;
+                Map.Entry entry = (Map.Entry) iter.next();
+                String key = entry.getKey().toString();
+                Object val = entry.getValue();
+                String columnType = columnsType.get(key).toString();
+                switch (columnType) {
+                    case "NVARCHAR2":
+                    case "NCHAR":
+                    case "VARCHAR2":
+                    case "CHAR":
+                    case "NUMBER":
+                        pstm.setString(index, val.toString());
+                        break;
+                    case "DATE":
+                        Timestamp dateTime = Timestamp.valueOf(val.toString());
+                        pstm.setTimestamp(index, dateTime);
+                        break;
+                    /*case "BLOB":
+                        PSTM.setBlob(index,oracle.sql.BLOB.getEmptyBLOB());
+
+                        break;*/
+                    case "CLOB":
+                        String str = val.toString();
+                        Reader clobReader = new StringReader(str); // 将 text转成流形式
+                        pstm.setClob(index, clobReader, str.length());
+                        break;
+                    default:
+                        pstm.setString(index, val.toString());
+                        break;
+
+                }
+            }
+
+            i = pstm.executeUpdate();
+            if (i > 0)
+                bl = true;
+
+
+        } catch (SQLException e) {
+            String msg = e.getMessage();
+        }
+
+
+        return bl;
+    }
+}

+ 62 - 0
loader/src/main/java/com/gyee/edge/loader/sqlite/config/Database.java

@@ -0,0 +1,62 @@
+package com.gyee.edge.loader.sqlite.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.sql.*;
+
+@Component
+public class Database {
+
+    @Value("${spring.datasource.driver-class-name}")
+    private String driver;
+
+    @Value("${spring.datasource.url}")
+    private String url;
+
+    Connection connection = null;
+
+    PreparedStatement preparedStatement = null;
+    ResultSet rs = null;
+
+
+    public Connection getConnection() throws Exception {
+        try{
+            Class.forName(driver);
+            connection = DriverManager.getConnection(url);
+            System.out.println("数据库连接成功");
+
+        }catch (Exception e){
+            throw new Exception(e.getMessage());
+        }
+
+        return connection;
+    }
+
+    //释放资源
+    public void releaseResource(){
+        if(rs!=null){
+            try{
+                rs.close();
+            }catch (SQLException e){
+                e.printStackTrace();
+            }
+        }
+
+        if(preparedStatement!=null){
+            try{
+                preparedStatement.close();
+            }catch (SQLException e){
+                e.printStackTrace();
+            }
+        }
+
+        if(connection!=null){
+            try{
+                connection.close();
+            }catch (SQLException e){
+                e.printStackTrace();
+            }
+        }
+    }
+}

+ 9 - 0
loader/src/main/resources/application.yaml

@@ -3,5 +3,14 @@ server:
 spring:
   application:
   name: loader
+  datasource:
+    driver-class-name: org.sqlite.JDBC
+    #    F:\\colud_ideaspace\\edge\\gateway\\src\\main\\resources\\myDb
+    url: jdbc:sqlite::resource:dbpointmapper.sqlite3
+    username:
+    password:
+
+rocketmq:
+  namesrvaddr: 127.0.0.1:9876
 
 adapterUrl: http://127.0.0.1:8011/ts

BIN
loader/src/main/resources/dbpointmapper.sqlite3


+ 17 - 0
protocol/gyfp2/build.gradle

@@ -0,0 +1,17 @@
+buildscript {
+    repositories {
+        mavenLocal()
+        maven {
+            allowInsecureProtocol = true
+            url "http://maven.aliyun.com/nexus/content/groups/public" }
+        mavenCentral()
+    }
+    dependencies {
+        classpath("$bootGroup:spring-boot-gradle-plugin:$springBootVersion")
+    }
+}
+
+dependencies {
+    implementation project(":common:utils")
+    implementation("io.netty:netty-all:$nettyVersion")
+}

+ 73 - 0
protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/GYMessage.java

@@ -0,0 +1,73 @@
+package com.gyee.protocol.gyfp2.message;
+
+import com.gyee.edge.common.utils.BytesUtils;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * @author :
+ * @date :Created in 2022/8/25 17:34
+ * @description:  定义gyfp2协议实体类
+ */
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class GYMessage implements Serializable {
+    private byte[] header=new byte[]{'G','Y','F','P','V','2','0','0'};
+    private short length;
+    private short count;
+    private int attribute;
+    private List<MessageData> messageDataList;
+    private MessageMutable messageMutable;
+
+
+
+     public byte[] toBytes(){
+          /*不指定默认256,指定64以内默认大小都是64.
+         如果写入后数据大小未超过512个字节,则选择下一个16的整数倍进行扩容。比如写入数据后大小为12,则扩容后的capacity是16。
+         如果写入后数据大小超过512个字节,则选择下一个2n。比如写入后大小是512字节,则扩容后的capacity是210=1024。(因为29=512,长度已经不够了)
+         扩容不能超过max capacity,否则会报错*/
+         int sum = 0;
+         ByteBuf buffer = Unpooled.buffer(10);
+         buffer.writeBytes(getHeader());
+         //占住总长度字节大小,方便后续代替
+         buffer.writeShort(1);
+         //占住数据项个数大小,方便后续代替
+         buffer.writeShort(1);
+         buffer.writeInt(getAttribute());
+         if (getAttribute() > 0){
+             buffer.writeBytes(getMessageMutable().toBytes(getAttribute()));
+         }
+         for (MessageData messageData : getMessageDataList()){
+             byte[] bytes = messageData.toBytes(getAttribute(),getMessageMutable().getMessageType());
+             sum++;
+             buffer.writeBytes(bytes);
+         }
+         //替换长度和数据项字节码
+         ByteBufUtil.setShortBE(buffer,getHeader().length,ByteBufUtil.getBytes(buffer).length);
+         ByteBufUtil.setShortBE(buffer,getHeader().length+2,sum);
+         return ByteBufUtil.getBytes(buffer);
+     }
+
+     //字节数组转对象
+     public static GYMessage toObject(byte[] bytes){
+         GYMessage gyMessage = new GYMessage();
+         if (bytes.length>0){
+             gyMessage.setLength(BytesUtils.getShort(bytes,8));
+             gyMessage.setCount(BytesUtils.getShort(bytes,10));
+             gyMessage.setAttribute(BytesUtils.getInt(bytes,12));
+             gyMessage.setMessageMutable(MessageMutable.toMessageMutable(bytes,BytesUtils.getInt(bytes,12)));
+             gyMessage.setMessageDataList(MessageData.toMessageData(bytes,BytesUtils.getInt(bytes,12)));
+         }
+         return gyMessage;
+     }
+
+}

+ 307 - 0
protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/MessageData.java

@@ -0,0 +1,307 @@
+package com.gyee.protocol.gyfp2.message;
+
+import com.gyee.edge.common.utils.BytesUtils;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class MessageData implements Serializable {
+    private int dataKey;
+    private long ts;
+    private byte dataType;
+
+    private Optional<Boolean> GYFP2_TYPE_BOOLEAN ; //Boolean 类型 0
+    private Optional<Float>  GYFP2_TYPE_FLOAT32;  //32 位浮点数 10
+    private Optional<Long> GYFP2_TYPE_INT64;  //无符号 64 位整数 8
+    private Optional<Double> GYFP2_TYPE_FLOAT64; //64 位浮点数 11
+    /*private byte GYFP2_TYPE_INT8;  //有符号 8 位整数 1
+    private byte GYFP2_TYPE_UINT8; //无符号 8 位整数 2
+    private short GYFP2_TYPE_INT16; //有符号 16 位整数 3
+    private char GYFP2_TYPE_UINT16;  //无符号 16 位整数 4
+    private float GYFP2_TYPE_INT32; //有符号 32 位整数 5
+    private int GYFP2_TYPE_UINT32; //无符号 32 位整数 6
+    private long GYFP2_TYPE_UINT64; //无符号 32 位整数 7
+    private char GYFP2_TYPE_FLOAT16; //16 位浮点数 9
+    private String GYFP2_TYPE_STRING;  //字符串类型 12
+    private boolean GYFP2_TYPE_BIT;  //位 15  */
+
+
+    public byte[] ValuetoBytes(byte type){
+        ByteBuf buffer = Unpooled.buffer(10);
+        if (type==0){
+            buffer.writeBoolean(GYFP2_TYPE_BOOLEAN.get());
+        }else if(type==10){
+            buffer.writeFloat(getGYFP2_TYPE_FLOAT32().get());
+        }else if(type==8) {
+            buffer.writeLong(getGYFP2_TYPE_INT64().get());
+        }else if(type==11){
+            buffer.writeDouble(getGYFP2_TYPE_FLOAT64().get());
+        /*else if(type==1){
+            buffer.writeByte(getGYFP2_TYPE_INT8());
+        }else if(type==2){
+            buffer.writeByte(getGYFP2_TYPE_UINT8());
+        }else if(type==3){
+            buffer.writeByte(getGYFP2_TYPE_INT16());
+        }else if(type==4){
+            buffer.writeByte(getGYFP2_TYPE_UINT16());
+        }else if(type==5){
+            buffer.writeFloat(getGYFP2_TYPE_INT32());
+        }else if(type==6){
+            buffer.writeInt(getGYFP2_TYPE_UINT32());
+        }else if(type==7){
+            buffer.writeLong(getGYFP2_TYPE_INT64());
+        }else if(type==8){
+            buffer.writeLong(getGYFP2_TYPE_UINT64());
+        }else if(type==9){
+            buffer.writeChar(getGYFP2_TYPE_FLOAT16());
+        }else if(type==11){
+            buffer.writeDouble(getGYFP2_TYPE_FLOAT64());
+        }else if(type==15){
+            buffer.writeBoolean(isGYFP2_TYPE_BIT());
+        }*/
+        }
+        byte[] bytes = ByteBufUtil.getBytes(buffer);
+        return bytes;
+    }
+
+    //MessageData对象转字节数组   messagetype决定实际长度类型
+    // mutable中有值则采用mutable中type的值,mutable中无值则采用messagedata中的datatype。
+    public byte[] toBytes(int attribute,byte messagetype){
+        ByteBuf buffer = Unpooled.buffer(10);
+        if (attribute==0){
+            buffer.writeByte(getDataType());
+            buffer.writeBytes(BytesUtils.intToByte2(getDataKey()));
+            buffer.writeLong(getTs());
+            buffer.writeBytes(ValuetoBytes(getDataType()));
+        }else if(attribute==1){
+            buffer.writeByte(getDataType());
+            buffer.writeLong(getTs());
+            buffer.writeBytes(ValuetoBytes(getDataType()));
+        }else if(attribute==2) {
+            buffer.writeBytes(BytesUtils.intToByte2(getDataKey()));
+            buffer.writeLong(getTs());
+            buffer.writeBytes(ValuetoBytes(messagetype));
+        }else if(attribute==3){
+            buffer.writeLong(getTs());
+            buffer.writeBytes(ValuetoBytes(messagetype));
+        }else if(attribute==4){
+            buffer.writeByte(getDataType());
+            buffer.writeBytes(BytesUtils.intToByte2(getDataKey()));
+            buffer.writeBytes(ValuetoBytes(getDataType()));
+        }else if (attribute==5){
+            buffer.writeByte(getDataType());
+            buffer.writeBytes(ValuetoBytes(getDataType()));
+        }else if (attribute==6){
+            buffer.writeBytes(BytesUtils.intToByte2(getDataKey()));
+            buffer.writeBytes(ValuetoBytes(messagetype));
+        }else if (attribute==7){
+            buffer.writeBytes(ValuetoBytes(messagetype));
+        }
+        byte[] bytes = ByteBufUtil.getBytes(buffer);
+        return bytes;
+    }
+
+
+    //字节数组转MessageData对象
+    public static List<MessageData> toMessageData(byte[] bytes, int attribute) {
+        List<MessageData> messageDataList = new ArrayList<>();
+        //获取data块字节大小
+        if(attribute==0) {
+            int count = bytes.length - 16;
+            byte[] bytes1 = new byte[count];
+            System.arraycopy(bytes, 16, bytes1, 0, count);
+            for (int i = 0; i < bytes1.length;) {
+                MessageData messageData = new MessageData();
+                messageData.setDataType(bytes1[i]);
+                messageData.setDataKey(BytesUtils.BytesToInt(bytes1,i+1));
+                messageData.setTs(BytesUtils.getLong2(bytes1,i+4));
+                if (bytes1[i]==0){
+                    messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+12));
+                    i = i + 13;
+                }else if (bytes1[i]==10){
+                    messageData.setGYFP2_TYPE_FLOAT32(BytesUtils.getFloat(bytes1,i+12));
+                    i = i + 16;
+                }else if(bytes1[i]==8){
+                    messageData.setGYFP2_TYPE_INT64(BytesUtils.getLong(bytes1,i+12));
+                    i = i + 20;
+                }else if(bytes1[i]==11){
+                    messageData.setGYFP2_TYPE_FLOAT64(BytesUtils.getDouble(bytes1,i+12));
+                    i = i + 20;
+                }
+                messageDataList.add(messageData);
+            }
+        } else if(attribute==1){
+            int count = bytes.length - 19;
+            byte[] bytes1 = new byte[count];
+            System.arraycopy(bytes, 19, bytes1, 0, count);
+            for (int i = 0; i < bytes1.length;) {
+                MessageData messageData = new MessageData();
+                messageData.setDataType(bytes1[i]);
+                messageData.setTs(BytesUtils.getLong2(bytes1,i+1));
+                if (bytes1[i]==0){
+                    messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+8));
+                    i = i + 9;
+                }else if (bytes1[i]==10){
+                    messageData.setGYFP2_TYPE_FLOAT32(BytesUtils.getFloat(bytes1,i+8));
+                    i = i + 12;
+                }else if(bytes1[i]==8){
+                    messageData.setGYFP2_TYPE_INT64(BytesUtils.getLong(bytes1,i+8));
+                    i = i + 16;
+                }else if(bytes1[i]==11){
+                    messageData.setGYFP2_TYPE_FLOAT64(BytesUtils.getDouble(bytes1,i+8));
+                    i = i + 16;
+                }
+                messageDataList.add(messageData);
+            }
+        }else if(attribute==2){
+            byte[] bytes2 = new byte[1];
+            System.arraycopy(bytes, 16, bytes2, 0, 1);
+            byte[] bytes1 = new byte[bytes.length-(16 + bytes2.length)];
+            System.arraycopy(bytes, 16 + bytes2.length, bytes1, 0, bytes1.length);
+            for (int i = 0; i < bytes1.length;) {
+                MessageData messageData = new MessageData();
+                messageData.setDataKey(BytesUtils.BytesToInt(bytes1,i));
+                messageData.setTs(BytesUtils.getLong2(bytes1,i+3));
+                if (bytes2[0]==0){
+                    messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+11));
+                    i = i + 12;
+                }else if (bytes2[0]==10){
+                    messageData.setGYFP2_TYPE_FLOAT32(BytesUtils.getFloat(bytes1,i+11));
+                    i = i + 15;
+                }else if(bytes2[0]==8){
+                    messageData.setGYFP2_TYPE_INT64(BytesUtils.getLong(bytes1,i+11));
+                    i = i + 19;
+                }else if(bytes2[0]==11){
+                    messageData.setGYFP2_TYPE_FLOAT64(BytesUtils.getDouble(bytes1,i+11));
+                    i = i + 19;
+                }
+                messageDataList.add(messageData);
+            }
+        }else if(attribute==3){
+            byte[] bytes2 = new byte[4];
+            System.arraycopy(bytes, 16, bytes2, 0, 4);
+            byte[] bytes1 = new byte[bytes.length-(16 + bytes2.length)];
+            System.arraycopy(bytes, 16 + bytes2.length, bytes1, 0, bytes1.length);
+            for (int i = 0; i < bytes1.length;) {
+                MessageData messageData = new MessageData();
+                messageData.setTs(BytesUtils.getLong2(bytes1,i));
+                if (bytes2[0]==0){
+                    messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+8));
+                    i = i + 9;
+                }else if (bytes2[0]==10){
+                    messageData.setGYFP2_TYPE_FLOAT32(BytesUtils.getFloat(bytes1,i+8));
+                    i = i + 12;
+                }else if(bytes2[0]==8){
+                    messageData.setGYFP2_TYPE_INT64(BytesUtils.getLong(bytes1,i+8));
+                    i = i + 16;
+                }else if(bytes2[0]==11){
+                    messageData.setGYFP2_TYPE_FLOAT64(BytesUtils.getDouble(bytes1,i+8));
+                    i = i + 16;
+                }
+                messageDataList.add(messageData);
+            }
+        }else if(attribute==4){
+            int count = bytes.length - 24;
+            byte[] bytes1 = new byte[count];
+            System.arraycopy(bytes, 24, bytes1, 0, count);
+            for (int i = 0; i < bytes1.length;) {
+                MessageData messageData = new MessageData();
+                messageData.setDataType(bytes1[i]);
+                messageData.setDataKey(BytesUtils.BytesToInt(bytes1,i+1));
+                if (bytes1[i]==0){
+                    messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+4));
+                    i = i + 5;
+                }else if (bytes1[i]==10){
+                    messageData.setGYFP2_TYPE_FLOAT32(BytesUtils.getFloat(bytes1,i+4));
+                    i = i + 8;
+                }else if(bytes1[i]==8){
+                    messageData.setGYFP2_TYPE_INT64(BytesUtils.getLong(bytes1,i+4));
+                    i = i + 12;
+                }else if(bytes1[i]==11){
+                    messageData.setGYFP2_TYPE_FLOAT64(BytesUtils.getDouble(bytes1,i+4));
+                    i = i + 12;
+                }
+                messageDataList.add(messageData);
+            }
+        }else if(attribute==5){
+            int count = bytes.length - 27;
+            byte[] bytes1 = new byte[count];
+            System.arraycopy(bytes, 27, bytes1, 0, count);
+            for (int i = 0; i < bytes1.length;) {
+                MessageData messageData = new MessageData();
+                messageData.setDataType(bytes1[i]);
+                if (bytes1[i]==0){
+                    messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+1));
+                    i = i + 2;
+                }else if (bytes1[i]==10){
+                    messageData.setGYFP2_TYPE_FLOAT32(BytesUtils.getFloat(bytes1,i+1));
+                    i = i + 5;
+                }else if(bytes1[i]==8){
+                    messageData.setGYFP2_TYPE_INT64(BytesUtils.getLong(bytes1,i+1));
+                    i = i + 9;
+                }else if(bytes1[i]==11){
+                    messageData.setGYFP2_TYPE_FLOAT64(BytesUtils.getDouble(bytes1,i+1));
+                    i = i + 9;
+                }
+                messageDataList.add(messageData);
+            }
+        }else if(attribute==6){
+            byte[] bytes2 = new byte[9];
+            System.arraycopy(bytes, 16, bytes2, 0, 9);
+            byte[] bytes1 = new byte[bytes.length-(16 + bytes2.length)];
+            System.arraycopy(bytes, 16 + bytes2.length, bytes1, 0, bytes1.length);
+            for (int i = 0; i < bytes1.length;) {
+                MessageData messageData = new MessageData();
+                messageData.setDataKey(BytesUtils.BytesToInt(bytes1,i));
+                if (bytes2[0]==0){
+                    messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+3));
+                    i = i + 4;
+                }else if (bytes2[0]==10){
+                    messageData.setGYFP2_TYPE_FLOAT32(BytesUtils.getFloat(bytes1,i+3));
+                    i = i + 7;
+                }else if(bytes2[0]==8){
+                    messageData.setGYFP2_TYPE_INT64(BytesUtils.getLong(bytes1,i+3));
+                    i = i + 11;
+                }else if(bytes2[0]==11){
+                    messageData.setGYFP2_TYPE_FLOAT64(BytesUtils.getDouble(bytes1,i+3));
+                    i = i + 11;
+                }
+                messageDataList.add(messageData);
+            }
+        }else if(attribute==7){
+            byte[] bytes2 = new byte[12];
+            System.arraycopy(bytes, 16, bytes2, 0, 12);
+            byte[] bytes1 = new byte[bytes.length-(16 + bytes2.length)];
+            System.arraycopy(bytes, 16 + bytes2.length, bytes1, 0, bytes1.length);
+            for (int i = 0; i < bytes1.length;) {
+                MessageData messageData = new MessageData();
+                if (bytes2[0]==0){
+                    messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i));
+                    i = i + 1;
+                }else if (bytes2[0]==10){
+                    messageData.setGYFP2_TYPE_FLOAT32(BytesUtils.getFloat(bytes1,i));
+                    i = i + 4;
+                }else if(bytes2[0]==8){
+                    messageData.setGYFP2_TYPE_INT64(BytesUtils.getLong(bytes1,i));
+                    i = i + 8;
+                }else if(bytes2[0]==11){
+                    messageData.setGYFP2_TYPE_FLOAT64(BytesUtils.getDouble(bytes1,i));
+                    i = i + 8;
+                }
+                messageDataList.add(messageData);
+            }
+        }
+        return messageDataList;
+    }
+}

+ 94 - 0
protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/MessageMutable.java

@@ -0,0 +1,94 @@
+package com.gyee.protocol.gyfp2.message;
+
+
+import com.gyee.edge.common.utils.ByteUtil;
+import com.gyee.edge.common.utils.BytesUtils;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import lombok.Data;
+
+import java.io.Serializable;
+
+
+@Data
+public class MessageMutable implements Serializable {
+    private byte messageType;
+    private int key;
+    private long ts;
+
+
+
+    public byte[] toBytes(int attribute){
+        ByteBuf buffer = Unpooled.buffer(4);
+        if(attribute==1){
+            buffer.writeBytes(BytesUtils.intToByte2(getKey()));
+        }else if(attribute==2){
+            buffer.writeByte(getMessageType());
+        }else if(attribute==3){
+            buffer.writeByte(getMessageType());
+            buffer.writeBytes(BytesUtils.intToByte2(getKey()));
+        }else if(attribute==4){
+            buffer.writeLong(getTs());
+        }else if(attribute==5){
+            buffer.writeBytes(BytesUtils.intToByte2(getKey()));
+            buffer.writeLong(getTs());
+        }else if (attribute==6){
+            buffer.writeByte(getMessageType());
+            buffer.writeLong(getTs());
+        }else if(attribute==7){
+            buffer.writeByte(getMessageType());
+            buffer.writeBytes(BytesUtils.intToByte2(getKey()));
+            buffer.writeLong(getTs());
+        }
+        byte[] bytes = ByteBufUtil.getBytes(buffer);
+        return bytes;
+    }
+
+    //字节数组转MessageMutable对象
+    public static MessageMutable toMessageMutable(byte[] bytes, int attribute){
+        MessageMutable messageMutable = new MessageMutable();
+        if (bytes.length > 16) {
+            if (attribute == 1) {
+                //获取header头部字节大小
+                byte[] bytes1 = new byte[3];
+                System.arraycopy(bytes, 16, bytes1, 0, 3);
+                int i = BytesUtils.BytesToInt(bytes1, 0);
+                messageMutable.setKey(i);
+            }else if (attribute == 2) {
+                byte[] bytes1 = new byte[1];
+                System.arraycopy(bytes, 16, bytes1, 0, 1);
+                messageMutable.setMessageType(bytes1[0]);
+            }else if (attribute == 3) {
+                byte[] bytes1 = new byte[4];
+                System.arraycopy(bytes, 16, bytes1, 0, 4);
+                messageMutable.setMessageType(bytes1[0]);
+                int i = BytesUtils.BytesToInt(bytes1, 1);
+                messageMutable.setKey(i);
+            }else if (attribute == 4) {
+                byte[] bytes1 = new byte[8];
+                System.arraycopy(bytes, 16, bytes1, 0, 8);
+                messageMutable.setTs(ByteUtil.getLong(bytes1, 0));
+            }else if (attribute == 5) {
+                byte[] bytes1 = new byte[11];
+                System.arraycopy(bytes, 16, bytes1, 0, 11);
+                int i = BytesUtils.BytesToInt(bytes1, 0);
+                messageMutable.setKey(i);
+                messageMutable.setTs(BytesUtils.getLong2(bytes1, 3));
+            }else if (attribute == 6) {
+                byte[] bytes1 = new byte[9];
+                System.arraycopy(bytes, 16, bytes1, 0, 9);
+                messageMutable.setMessageType(bytes1[0]);
+                messageMutable.setTs(BytesUtils.getLong2(bytes1, 1));
+            }else if (attribute == 7){
+                byte[] bytes1 = new byte[12];
+                System.arraycopy(bytes, 16, bytes1, 0, 12);
+                messageMutable.setMessageType(bytes1[0]);
+                int i = BytesUtils.BytesToInt(bytes1, 1);
+                messageMutable.setKey(i);
+                messageMutable.setTs(BytesUtils.getLong2(bytes1, 4));
+            }
+        }
+        return messageMutable;
+    }
+}

+ 1 - 0
settings.gradle

@@ -1,6 +1,7 @@
 rootProject.name = "edge"
 include "common:utils"
 include "protocol:iec60870"
+include "protocol:gyfp2"
 include "gateway"
 include "bridge"
 include "loader"