Browse Source

protobuf
protobuf和rocketmq代码提交

‘xugp 2 years ago
parent
commit
f1bac854d7
40 changed files with 2554 additions and 10 deletions
  1. 1 0
      common/utils/build.gradle
  2. 3 3
      common/utils/src/main/java/com/gyee/edge/common/utils/ByteUtil.java
  3. 53 0
      common/utils/src/main/java/com/gyee/edge/common/utils/util/ProtoJsonUtil.java
  4. 0 0
      gateway/README.md
  5. 4 0
      gateway/build.gradle
  6. 30 0
      gateway/src/main/java/com/gyee/edge/gateway/ApplicationEventListener.java
  7. 40 0
      gateway/src/main/java/com/gyee/edge/gateway/bridge/rocketmq/producter/AsyncProducter.java
  8. 84 0
      gateway/src/main/java/com/gyee/edge/gateway/bridge/test/ReadGolden.java
  9. 10 0
      gateway/src/main/java/com/gyee/edge/gateway/bridge/test/StringKey.java
  10. 25 0
      gateway/src/main/java/com/gyee/edge/gateway/config/cache/PointCache.java
  11. 15 0
      gateway/src/main/java/com/gyee/edge/gateway/config/point/PointMapper.java
  12. 46 0
      gateway/src/main/java/com/gyee/edge/gateway/config/point/ReadCSVData.java
  13. 27 0
      gateway/src/main/java/com/gyee/edge/gateway/data/GeneralTsData.java
  14. 15 0
      gateway/src/main/java/com/gyee/edge/gateway/data/TsData.java
  15. 15 0
      gateway/src/main/java/com/gyee/edge/gateway/data/TsDataType.java
  16. 56 0
      gateway/src/main/java/com/gyee/edge/gateway/data/TsPointData.java
  17. 54 0
      gateway/src/main/java/com/gyee/edge/gateway/message/GYMessage.java
  18. 61 0
      gateway/src/main/java/com/gyee/edge/gateway/message/MessageData.java
  19. 52 0
      gateway/src/main/java/com/gyee/edge/gateway/message/MessageMutable.java
  20. 50 0
      gateway/src/main/java/com/gyee/edge/gateway/message/TestHalp.java
  21. 81 0
      gateway/src/main/java/com/gyee/edge/gateway/protobuf/UserProtoTest.java
  22. 24 0
      gateway/src/main/java/com/gyee/edge/gateway/protobuf/user.proto
  23. 5 1
      gateway/src/main/resources/application.yaml
  24. 5 1
      gradle.properties
  25. 11 5
      loader/build.gradle
  26. 30 0
      loader/src/main/java/com/gyee/edge/loader/ApplicationConsumerEventListener.java
  27. 11 0
      loader/src/main/java/com/gyee/edge/loader/data/Datas.java
  28. 27 0
      loader/src/main/java/com/gyee/edge/loader/data/GeneralTsData.java
  29. 15 0
      loader/src/main/java/com/gyee/edge/loader/data/TsData.java
  30. 15 0
      loader/src/main/java/com/gyee/edge/loader/data/TsDataType.java
  31. 59 0
      loader/src/main/java/com/gyee/edge/loader/data/TsPointData.java
  32. 13 0
      loader/src/main/java/com/gyee/edge/loader/data/User.java
  33. 53 0
      loader/src/main/java/com/gyee/edge/loader/golden/DataWrite.java
  34. 1455 0
      loader/src/main/java/com/gyee/edge/loader/protobuf/UserProto.java
  35. 24 0
      loader/src/main/java/com/gyee/edge/loader/protobuf/user.proto
  36. 32 0
      loader/src/main/java/com/gyee/edge/loader/service/RemoteServiceBuilder.java
  37. 17 0
      loader/src/main/java/com/gyee/edge/loader/service/SqlService.java
  38. 22 0
      loader/src/main/java/com/gyee/edge/loader/service/WriteDataService.java
  39. 7 0
      loader/src/main/resources/application.yaml
  40. 7 0
      loader/src/main/resources/banner.txt

+ 1 - 0
common/utils/build.gradle

@@ -7,4 +7,5 @@ dependencies {
     api("com.google.guava:guava:$guavaVersion")
     api("com.alibaba:fastjson:$fastjsonVersion")
     api("commons-beanutils:commons-beanutils:$commonsBeanUtilsVersion")
+    implementation 'com.google.protobuf:protobuf-java-util:3.15.3'
 }

+ 3 - 3
common/utils/src/main/java/com/gyee/edge/common/utils/ByteUtil.java

@@ -49,9 +49,9 @@ public class ByteUtil {
      * @return
      */
     public static int getInt(byte[] bb, int index) {
-        return (int) ((((bb[index + 3] & 0xff) << 24)
-                | ((bb[index + 2] & 0xff) << 16)
-                | ((bb[index + 1] & 0xff) << 8) | ((bb[index + 0] & 0xff) << 0)));
+        return (int) ((((bb[index + 3] & 0xff))
+                | ((bb[index + 2] & 0xff) << 8)
+                | ((bb[index + 1] & 0xff) << 16) | ((bb[index + 0] & 0xff) << 24)));
     }
 
     /**

+ 53 - 0
common/utils/src/main/java/com/gyee/edge/common/utils/util/ProtoJsonUtil.java

@@ -0,0 +1,53 @@
+package com.gyee.edge.common.utils.util;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.util.JsonFormat;
+
+/**
+ * <ul> 注意:
+ *  <li>该实现无法处理含有Any类型字段的Message</li>
+ *  <li>enum类型数据会转化为enum的字符串名</li>
+ *  <li>bytes会转化为utf8编码的字符串</li>
+ * </ul> 以上这段暂未进行测试
+ *
+ * @author wuxiongwei
+ * @date 2021/5/13 16:04
+ * @Description proto 与 Json 转换工具类
+ */
+public class ProtoJsonUtil {
+    /**
+     * proto 对象转 JSON
+     * 使用方法: //反序列化之后
+     *             UserProto.User user1 = UserProto.User.parseFrom(user);
+     *             //转 json
+     *             String jsonObject = ProtoJsonUtil.toJson(user1);
+     * @param sourceMessage proto 对象
+     * @return 返回 JSON 数据
+     * @throws InvalidProtocolBufferException
+     */
+    public static String toJson(Message sourceMessage) throws InvalidProtocolBufferException {
+        if (sourceMessage != null) {
+            String json = JsonFormat.printer().print(sourceMessage);
+            return json;
+        }
+        return null;
+    }
+
+    /**
+     * JSON 转 proto 对象
+     * 使用方法:Message message = ProtoJsonUtil.toObject(UserProto.User.newBuilder(), jsonObject);
+     * @param targetBuilder proto 对象 bulider
+     * @param json          json 数据
+     * @return 返回转换后的 proto 对象
+     * @throws InvalidProtocolBufferException
+     */
+    public static Message toObject(Message.Builder targetBuilder, String json) throws InvalidProtocolBufferException {
+        if (json != null) {
+            //ignoringUnknownFields 如果 json 串中存在的属性 proto 对象中不存在,则进行忽略,否则会抛出异常
+            JsonFormat.parser().ignoringUnknownFields().merge(json, targetBuilder);
+            return targetBuilder.build();
+        }
+        return null;
+    }
+}

gateway/Readme.md → gateway/README.md


+ 4 - 0
gateway/build.gradle

@@ -10,5 +10,9 @@ dependencies {
     implementation("org.apache.logging.log4j:log4j-jul:$log4jVersion")
     implementation("org.apache.logging.log4j:log4j-api:$log4jVersion")
     implementation("org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion")
+    implementation("org.apache.rocketmq:rocketmq-client:$rocketMq")
+    implementation("com.google.protobuf:protobuf-java:$protoBuf")
+    implementation("net.sourceforge.javacsv:javacsv:$Javacsv")
+    implementation 'com.google.protobuf:protobuf-java-util:3.21.2'
 
 }

+ 30 - 0
gateway/src/main/java/com/gyee/edge/gateway/ApplicationEventListener.java

@@ -0,0 +1,30 @@
+package com.gyee.edge.gateway;
+
+import com.gyee.edge.gateway.bridge.rocketmq.producter.AsyncProducter;
+import com.gyee.edge.gateway.config.cache.PointCache;
+import com.gyee.edge.gateway.config.point.ReadCSVData;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class ApplicationEventListener implements
+        ApplicationListener<ApplicationReadyEvent> {
+    @Autowired
+    private PointCache pointCache;
+
+
+    @SneakyThrows
+    @Override
+    public void onApplicationEvent(ApplicationReadyEvent event) {
+        log.info("ApplicationEvent  rised!");
+        log.info("listener: " + event.toString());
+        AsyncProducter.dataSendProducter();
+    }
+
+}

+ 40 - 0
gateway/src/main/java/com/gyee/edge/gateway/bridge/rocketmq/producter/AsyncProducter.java

@@ -0,0 +1,40 @@
+package com.gyee.edge.gateway.bridge.rocketmq.producter;
+
+
+import com.gyee.edge.gateway.bridge.test.ReadGolden;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+
+//异步请求
+@Slf4j
+public class AsyncProducter {
+    public static void dataSendProducter() throws Exception {
+        //实例化消息的生产者
+        DefaultMQProducer producer = new DefaultMQProducer("group_test");
+        //设置nameServer的地址
+        producer.setNamesrvAddr("127.0.0.1:9876");
+        //启动producer实例
+        producer.start();
+        byte[] bytes = ReadGolden.protoData();
+            //创建消息
+            Message message = new Message("keyMessage"/*Topic*/,
+                    "TagA"/* Tag*/, "OrderID888", bytes);
+            producer.send(message, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    System.out.println(sendResult);
+                }
+
+                @Override
+                public void onException(Throwable throwable) {
+                    log.info("消息发送失败", throwable);
+                    throwable.printStackTrace();
+                }
+            });
+//            Thread.sleep(1000);
+//        producer.shutdown();
+    }
+}

+ 84 - 0
gateway/src/main/java/com/gyee/edge/gateway/bridge/test/ReadGolden.java

@@ -0,0 +1,84 @@
+package com.gyee.edge.gateway.bridge.test;
+
+
+import com.gyee.edge.common.utils.ByteUtil;
+import com.gyee.edge.gateway.protobuf.UserProto;
+
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ReadGolden {
+    public static byte[] protoData() throws UnsupportedEncodingException {
+        String[] tagNames = StringKey.key.split(",");
+        List<String> list = Arrays.asList(tagNames);
+        Map<String, Object> map = new HashMap<>();
+        Map<byte[], byte[]> bl = ReadGolden.putByte(list);
+        Date ST=new Date();
+        System.out.println("序列化前时间"+(ST.getTime()));
+        UserProto.User.Builder user = UserProto.User.newBuilder();
+        UserProto.User.Datas.Builder objectMap = UserProto.User.Datas.newBuilder();
+        for (Map.Entry<byte[],byte[]> entry:bl.entrySet()){
+            String keys = ReadGolden.byteToString(entry.getKey());
+            double val = ByteUtil.getDouble(entry.getValue(), 0);
+            long ts = ByteUtil.getLong2(entry.getValue(), 8);
+            user.putMap(keys, objectMap.setTs(ts).setValue(val).build());
+        }
+        UserProto.User build = user.build();
+        byte[] s = build.toByteArray();
+        Date ET=new Date();
+        System.out.println("序列化后时间"+(ET.getTime()));
+        System.out.println("序列化时间:"+(ET.getTime()-ST.getTime()));
+        return s;
+    }
+
+
+    public static Map<byte[], byte[]> putByte(List<String> lists) throws UnsupportedEncodingException {
+        Map<byte[], byte[]> b = new HashMap<>();
+        byte[][] rawKeys = new byte[lists.size()][];
+        double min = 1.0;
+        double max = 100.0;
+        for (int i = 0; i < lists.size(); i++) {
+            rawKeys[i] = lists.get(i).getBytes("ascii");
+            byte[] bb = {};
+            //随机生成double
+            double generatedDouble = ThreadLocalRandom.current().nextDouble(min, max);
+            byte[] bt1 = ByteUtil.putDouble(bb, generatedDouble, 0);
+            long time = new Date().getTime();
+            byte[] bt2 = ByteUtil.putLong(bb, time, 0);
+            byte[] bt3 = new byte[bt1.length + bt2.length];
+            bt3[0] = bt1[0];
+            bt3[1] = bt1[1];
+            bt3[2] = bt1[2];
+            bt3[3] = bt1[3];
+            bt3[4] = bt1[4];
+            bt3[5] = bt1[5];
+            bt3[6] = bt1[6];
+            bt3[7] = bt1[7];
+            bt3[8] = bt2[0];
+            bt3[9] = bt2[1];
+            bt3[10] = bt2[2];
+            bt3[11] = bt2[3];
+            bt3[12] = bt2[4];
+            bt3[13] = bt2[5];
+            bt3[14] = bt2[6];
+            bt3[15] = bt2[7];
+            b.put(rawKeys[i], bt3);
+        }
+        return b;
+    }
+
+    private static String byteToString (byte[] bytes) {
+        if (null == bytes || bytes.length == 0) {
+            return "";
+        }
+        String strContent = "";
+        try {
+            strContent = new String(bytes, "utf-8");
+        } catch (UnsupportedEncodingException e) {
+            e.printStackTrace();
+        }
+        return strContent;
+    }
+
+}

+ 10 - 0
gateway/src/main/java/com/gyee/edge/gateway/bridge/test/StringKey.java

@@ -0,0 +1,10 @@
+package com.gyee.edge.gateway.bridge.test;
+
+public class StringKey {
+     public static String key ="SHSJ.NX_GD_SH_XXX_XXX_0001_PLD," +
+             "SHSJ.NX_GD_SH_XXX_XXX_0002_PLD," +
+             "SHSJ.NX_GD_SH_XXX_XXX_0003_PLD," +
+             "SHSJ.NX_GD_SH_XXX_XXX_0004_PLD," +
+             "SHSJ.NX_GD_SH_XXX_XXX_0005_PLD," ;
+
+}

+ 25 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/cache/PointCache.java

@@ -0,0 +1,25 @@
+package com.gyee.edge.gateway.config.cache;
+
+import com.gyee.edge.gateway.config.point.PointMapper;
+import com.gyee.edge.gateway.config.point.ReadCSVData;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+
+@Component
+public class PointCache {
+    private ArrayList<PointMapper> strList = new ArrayList<>();
+
+    @Value("${filePath}")
+    String filePath;
+
+    public ArrayList<PointMapper> getPointMapper(){
+        if (strList.size() <= 0){
+            strList = ReadCSVData.readConfig(filePath);
+            return strList;
+        }
+        else
+            return strList;
+    }
+}

+ 15 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/point/PointMapper.java

@@ -0,0 +1,15 @@
+package com.gyee.edge.gateway.config.point;
+
+import lombok.Data;
+
+@Data
+public class PointMapper {
+    private String dataSource;
+    private String protocolSource;
+    private int sourceAddress;
+    private double ratio;//倍率
+    private int messageKey;
+    private String messageId;
+
+
+}

+ 46 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/point/ReadCSVData.java

@@ -0,0 +1,46 @@
+package com.gyee.edge.gateway.config.point;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import com.csvreader.CsvReader;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class ReadCSVData {
+    public static ArrayList<PointMapper> readConfig(String filePath)  {
+        //默认路径
+        if (filePath.length()>0) {
+            return readCsvByCsvReader(filePath);
+        }
+        else{
+            return null;
+        }
+    }
+
+    public static ArrayList<PointMapper> readCsvByCsvReader(String filePath) {
+        ArrayList<PointMapper> strList = null;
+        try {
+            ArrayList<String[]> arrList = new ArrayList<String[]>();
+            strList = new ArrayList<PointMapper>();
+            CsvReader reader = new CsvReader(filePath, ',', Charset.forName("UTF-8"));
+            while (reader.readRecord()) {
+                arrList.add(reader.getValues());
+            }
+            reader.close();
+            for (int row = 0; row < arrList.size(); row++) {
+                PointMapper pointMapper = new PointMapper();
+                pointMapper.setDataSource(arrList.get(row)[0]);
+                pointMapper.setProtocolSource(arrList.get(row)[1]);
+                pointMapper.setSourceAddress(Integer.valueOf(arrList.get(row)[2]));
+                pointMapper.setRatio(Double.valueOf(arrList.get(row)[3]));
+                pointMapper.setMessageKey(Integer.valueOf(arrList.get(row)[4]));
+                pointMapper.setMessageId(arrList.get(row)[5]);
+                strList.add(pointMapper);
+            }
+        } catch (Exception e) {
+            log.error("读取csv文件发生错误",e);
+        }
+        return strList;
+    }
+}

+ 27 - 0
gateway/src/main/java/com/gyee/edge/gateway/data/GeneralTsData.java

@@ -0,0 +1,27 @@
+package com.gyee.edge.gateway.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Optional;
+
+/**
+ * @author songwb<songwb@aliyun.com>
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class GeneralTsData implements TsData {
+
+    private long ts;
+    private short status;
+    private Optional<Double> doubleValue;
+    private Optional<Long> longValue;
+    private Optional<Boolean> booleanValue;
+    private Optional<String> stringValue;
+    private Optional<String> blobValue;
+
+
+}
+

+ 15 - 0
gateway/src/main/java/com/gyee/edge/gateway/data/TsData.java

@@ -0,0 +1,15 @@
+package com.gyee.edge.gateway.data;
+
+/**
+ * @author songwb<songwb@aliyun.com>
+ */
+public interface TsData {
+
+    long getTs();
+
+    short getStatus();
+
+    //double getValue();
+
+}
+

+ 15 - 0
gateway/src/main/java/com/gyee/edge/gateway/data/TsDataType.java

@@ -0,0 +1,15 @@
+package com.gyee.edge.gateway.data;
+
+/**
+ * @author songwb<songwb@aliyun.com>
+ */
+public enum TsDataType {
+    LONG,
+    DOUBLE,
+    BOOLEAN,
+    STRING,
+    BLOB,
+    COORDINATE
+
+}
+

+ 56 - 0
gateway/src/main/java/com/gyee/edge/gateway/data/TsPointData.java

@@ -0,0 +1,56 @@
+package com.gyee.edge.gateway.data;
+
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author songwb<songwb@aliyun.com>
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class TsPointData {
+
+    private String tagName;
+    private GeneralTsData tsData;
+
+    public TsDataType findDataType() {
+        if (tsData.getDoubleValue().isPresent()){
+            return TsDataType.DOUBLE;
+        }
+        else if (tsData.getBooleanValue().isPresent()){
+            return TsDataType.BOOLEAN;
+        }
+        else if (tsData.getLongValue().isPresent()){
+            return TsDataType.LONG;
+        }
+
+        else if (tsData.getStringValue().isPresent()){
+            return TsDataType.STRING;
+        }
+
+        else if (tsData.getBlobValue().isPresent()){
+            return TsDataType.BLOB;
+        }
+
+
+        return TsDataType.DOUBLE;
+    }
+    /**
+     * 获取 double 类型值
+     * @return
+     */
+    public double getValue() {
+        if (tsData.getDoubleValue().isPresent()) {
+            return tsData.getDoubleValue().get();
+        } else if (tsData.getBooleanValue().isPresent()) {
+            return tsData.getBooleanValue().get() ? 1.0 : 0.0;
+        } else if (tsData.getLongValue().isPresent()) {
+            return tsData.getLongValue().get();
+        }
+        return 0.0;
+    }
+}
+

+ 54 - 0
gateway/src/main/java/com/gyee/edge/gateway/message/GYMessage.java

@@ -0,0 +1,54 @@
+package com.gyee.edge.gateway.message;
+
+import com.gyee.edge.common.utils.ByteUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.Data;
+import java.io.*;
+import java.util.List;
+
+@Data
+public class GYMessage implements Serializable {
+    private byte[] header=new byte[]{'G','Y','F','P','V','2','0','0'};
+    private short length;
+    private short count;
+    private int attribute;
+    private List<MessageData> messageDataList;
+    private MessageMutable messageMutable;
+
+
+
+     public byte[] toBytes(){
+         //不指定默认256,指定64以内默认大小都是64.
+         /*如果写入后数据大小未超过512个字节,则选择下一个16的整数倍进行扩容。比如写入数据后大小为12,则扩容后的capacity是16。
+         如果写入后数据大小超过512个字节,则选择下一个2n。比如写入后大小是512字节,则扩容后的capacity是210=1024。(因为29=512,长度已经不够了)
+         扩容不能超过max capacity,否则会报错*/
+         ByteBuf buffer = Unpooled.buffer(10);
+         buffer.writeBytes(getHeader());
+         buffer.writeShort(getLength());
+         buffer.writeShort(getCount());
+         buffer.writeInt(getAttribute());
+         buffer.writeBytes(getMessageMutable().toBytes(getAttribute()));
+         for (MessageData messageData : getMessageDataList()){
+             buffer.writeBytes(messageData.toBytes(getAttribute()));
+         }
+         return buffer.touch().array();
+     }
+
+     public GYMessage toObject(byte[] bytes){
+         GYMessage gyMessage = new GYMessage();
+         gyMessage.setLength(ByteUtil.getShort(bytes,8));
+         gyMessage.setCount(ByteUtil.getShort(bytes,11));
+         gyMessage.setAttribute(ByteUtil.getInt(bytes,12));
+         gyMessage.setMessageMutable(getMessageMutable().toObject(bytes,getAttribute()));
+         gyMessage.setMessageDataList(MessageData.toObject(bytes,getAttribute()));
+         return gyMessage;
+     }
+
+
+
+
+
+
+
+}

+ 61 - 0
gateway/src/main/java/com/gyee/edge/gateway/message/MessageData.java

@@ -0,0 +1,61 @@
+package com.gyee.edge.gateway.message;
+
+import com.gyee.edge.common.utils.ByteUtil;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+public class MessageData implements Serializable {
+    private int dataKey;
+    private int ts;
+    private byte dataType;
+    private float dataValue;
+
+
+    public byte[] toBytes(int attribute){
+        ByteBuffer byteBuffer = ByteBuffer.allocate(11).order(ByteOrder.BIG_ENDIAN);
+        if(attribute==0){
+            byteBuffer.putFloat(getDataValue());
+        }else if(attribute==1){
+            byteBuffer.putInt(getDataKey());
+            byteBuffer.putFloat(getDataValue());
+        }else if(attribute==2){
+            byteBuffer.putInt(getTs());
+            byteBuffer.putFloat(getDataValue());
+        }else if (attribute==3){
+            byteBuffer.put(getDataType());
+            byteBuffer.putInt(getTs());
+            byteBuffer.putFloat(getDataValue());
+        }return byteBuffer.array();
+    }
+
+    public static List<MessageData> toObject(byte[] bytes, int attribute) {
+        List<MessageData> messageDataList = new ArrayList<>();
+        int count = bytes.length - 25;
+        byte[] bytes1 = new byte[count];
+        System.arraycopy(bytes, 25, bytes1, 0, count);
+        for (int i = 0; i < bytes1.length; i=i+11) {
+            MessageData messageData = new MessageData();
+            if (attribute == 0) {
+                messageData.setDataValue(ByteUtil.getFloat(bytes1,i));
+            } else if (attribute == 1) {
+                messageData.setDataKey(ByteUtil.getInt(bytes1,i));
+                messageData.setDataValue(ByteUtil.getFloat(bytes1,i+4));
+            } else if (attribute == 2) {
+                messageData.setTs(ByteUtil.getInt(bytes1,i));
+                messageData.setDataValue(ByteUtil.getFloat(bytes1,i+4));
+            } else if (attribute == 3) {
+                messageData.setDataType(bytes1[i]);
+                messageData.setTs(ByteUtil.getInt(bytes1,i+1));
+                messageData.setDataValue(ByteUtil.getFloat(bytes1,i+5));
+            }
+            messageDataList.add(messageData);
+        }
+        return messageDataList;
+    }
+}

+ 52 - 0
gateway/src/main/java/com/gyee/edge/gateway/message/MessageMutable.java

@@ -0,0 +1,52 @@
+package com.gyee.edge.gateway.message;
+
+
+import com.gyee.edge.common.utils.ByteUtil;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+
+@Data
+public class MessageMutable implements Serializable {
+    private byte messageType;
+    private int key;
+    private int ts;
+
+    public byte[] toBytes(int attribute){
+        ByteBuffer byteBuffer = ByteBuffer.allocate(9).order(ByteOrder.BIG_ENDIAN);
+        if(attribute==0){
+            byteBuffer.putInt(getKey());
+        }else if(attribute==1){
+            byteBuffer.put(getMessageType());
+        }else if (attribute==2){
+            byteBuffer.putInt(getTs());
+        }else if(attribute==3){
+            byteBuffer.put(getMessageType());
+            byteBuffer.putInt(getKey());
+            byteBuffer.putInt(getTs());
+        }
+        byte[] array = byteBuffer.array();
+        return array;
+    }
+
+    public MessageMutable toObject(byte[] bytes,int attribute){
+        byte[] bytes1 = new byte[9];
+        System.arraycopy(bytes,16,bytes1,0,9);
+        MessageMutable messageMutable = new MessageMutable();
+        if(attribute==0){
+            messageMutable.setKey(ByteUtil.getInt(bytes1,0));
+        }else if(attribute==1){
+            messageMutable.setMessageType(bytes1[0]);
+        }else if (attribute==2){
+            messageMutable.setTs(ByteUtil.getInt(bytes1,0));
+        }else if(attribute==3){
+            messageMutable.setMessageType(bytes1[0]);
+            messageMutable.setKey(ByteUtil.getInt(bytes1,1));
+            messageMutable.setTs(ByteUtil.getInt(bytes1,5));
+        }
+        return messageMutable;
+    }
+}

+ 50 - 0
gateway/src/main/java/com/gyee/edge/gateway/message/TestHalp.java

@@ -0,0 +1,50 @@
+package com.gyee.edge.gateway.message;
+import com.google.gson.Gson;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class TestHalp {
+    public static void main(String[] args) {
+        GYMessage gyMessage = new GYMessage();
+        gyMessage.setAttribute(3);
+        gyMessage.setCount((short) 4);
+        gyMessage.setLength(gyMessage.getLength());
+
+        MessageMutable messageMutable = new MessageMutable();
+        messageMutable.setMessageType((byte) 1);
+        messageMutable.setKey(1);
+        messageMutable.setTs(10);
+        gyMessage.setMessageMutable(messageMutable);
+
+        List<MessageData> list = new ArrayList<>();
+        MessageData messageData = new MessageData();
+        messageData.setDataKey(1);
+        messageData.setDataType((byte) 1);
+        messageData.setDataValue(1);
+        messageData.setTs(1);
+
+
+        list.add(messageData);
+        gyMessage.setMessageDataList(list);
+        byte[] bytes = gyMessage.toBytes();
+
+        System.out.println(bytes);
+        GYMessage gyMessage1 = gyMessage.toObject(bytes);
+        System.out.println(gyMessage1);
+
+
+
+//        byte[] objectToByteArray = gyMessage.objectToByteArray(gyMessage);
+//        System.err.println("objectToByteArray: " + objectToByteArray);
+//
+//
+//        Object byteArrayToObject = gyMessage.byteArrayToObject(objectToByteArray);
+//        System.err.println("byteArrayToObject: " + byteArrayToObject.toString());
+//        GYMessage d=(GYMessage)byteArrayToObject;
+//        System.out.println(JSON.toJSONString(d));
+    }
+
+
+}

+ 81 - 0
gateway/src/main/java/com/gyee/edge/gateway/protobuf/UserProtoTest.java

@@ -0,0 +1,81 @@
+package com.gyee.edge.gateway.protobuf;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MessageOrBuilder;
+import com.google.protobuf.TextFormat;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class UserProtoTest {
+//    public static void main(String[] args) {
+//        UserProto.User.Builder user = UserProto.User.newBuilder();
+//        System.out.println("测试两千个点数据:");
+//        Map<String,Double> map = new HashMap<>();
+//        for(int i = 1;i<=2000;i++){
+//            map.put(i+"",Math.random());
+//        }
+//        user.putAllMap(map);
+//        Date ST=new Date();
+//        UserProto.User build1 = user.build();
+//        byte[] s1 = build1.toByteArray();
+//        Date ET=new Date();
+//        System.out.println("序列化时间:"+(ET.getTime()-ST.getTime()));
+//        System.out.println("protobuf序列化大小: " + s1.length);
+//        UserProto.User user1 = null;
+//        try {
+//            //反序列化
+//            Date date1 = new Date();
+//            user1 = UserProto.User.parseFrom(s1);
+//            Date date2 = new Date();
+//            System.out.println("反序列化时间"+(date2.getTime()-date1.getTime()));
+//        } catch (InvalidProtocolBufferException e) {
+//            e.printStackTrace();
+//        }
+//
+//        System.out.println("--------------------");
+//
+//        System.out.println("测试两千个点一半数据空值测试:");
+//        user.clear();
+//        map.clear();
+//        for(int j = 1;j<=2000;j++){
+//            if (j<=1000){
+//                map.put(j+"",Math.random());
+//            }else {
+//                map.put(j+"",null);
+//            }
+//        }
+//        user.putAllMap(map);
+//        Date ST1=new Date();
+//        UserProto.User build2 = user.build();
+//        byte[] s2 = build2.toByteArray();
+//        Date ET2=new Date();
+//        System.out.println("序列化时间:"+(ET2.getTime()-ST1.getTime()));
+//        System.out.println("protobuf一半数据空值序列化大小: " + s2.length);
+//        UserProto.User users2 = null;
+//        try {
+//            //反序列化
+//            Date date3 = new Date();
+//            users2 = UserProto.User.parseFrom(s2);
+//            Date date4 = new Date();
+//            System.out.println("反序列化时间"+(date4.getTime()-date3.getTime()));
+//        } catch (InvalidProtocolBufferException e) {
+//            e.printStackTrace();
+//        }
+//    }
+
+    /**
+     * 处理反序列化时中文出现的八进制问题(属性值为中文时可能会出现这样的八进制\346\223\215\344\275\234\345\221\230)
+     * 可直接使用 protobuf 自带的 TextFormat.printToUnicodeString(message) 方法,但是这个方法过时了,直接从这个方法内部拿出来使用就可以了
+     *
+     * @param message 转换的 protobuf 对象
+     * @return string
+     */
+    public static String printToUnicodeString(MessageOrBuilder message) {
+        return TextFormat.printer().escapingNonAscii(false).printToString(message);
+    }
+
+}

+ 24 - 0
gateway/src/main/java/com/gyee/edge/gateway/protobuf/user.proto

@@ -0,0 +1,24 @@
+//使用 proto3 语法 ,未指定则使用proto2
+syntax = "proto3";
+
+// proto 文件包名
+package com.gyee.edge.gateway.protobuf;
+
+//生成 proto 文件所在包路径,一般来说是和文件包名一致就可以
+option java_package = "com.gyee.edge.gateway.protobuf";
+
+//生成 proto 的文件名
+option java_outer_classname="UserProto";
+
+
+//创建一个 User 对象
+message User{
+  // 定义简单的 Map string
+  map<string, Datas> map = 8;
+
+  message Datas {
+    int64 ts = 1;
+    double value = 2;
+  }
+}
+

+ 5 - 1
gateway/src/main/resources/application.yaml

@@ -1,5 +1,9 @@
 server:
-  port: 8080
+  port: 8012
 spring:
   application:
   name: gateway
+
+
+filePath: D:\\data.csv
+

+ 5 - 1
gradle.properties

@@ -36,4 +36,8 @@ log4jVersion=2.17.1
 fastjsonVersion=1.2.58
 commonsBeanUtilsVersion=1.9.4
 alibabaDruidVersion=1.2.9
-gsonVersion=2.8.5
+gsonVersion=2.8.5
+rocketMq=4.8.0
+protoBuf=3.21.2
+openFeignVersion=11.8
+Javacsv=2.0

+ 11 - 5
loader/build.gradle

@@ -2,9 +2,15 @@ buildscript {
 }
 
 dependencies {
-    api("commons-codec:commons-codec:$commonsCodecVersion")
-    api("org.apache.commons:commons-lang3:$commonsLang3Version")
-    api("com.google.guava:guava:$guavaVersion")
-    api("com.alibaba:fastjson:$fastjsonVersion")
-    api("commons-beanutils:commons-beanutils:$commonsBeanUtilsVersion")
+    implementation project(":common:utils")
+    implementation("$bootGroup:spring-boot-starter:$springBootVersion")
+    implementation("com.google.code.gson:gson:$gsonVersion")
+    implementation("org.apache.logging.log4j:log4j-core:$log4jVersion")
+    implementation("org.apache.logging.log4j:log4j-jul:$log4jVersion")
+    implementation("org.apache.logging.log4j:log4j-api:$log4jVersion")
+    implementation("org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion")
+    implementation("org.apache.rocketmq:rocketmq-client:$rocketMq")
+    implementation("com.google.protobuf:protobuf-java:$protoBuf")
+    implementation("io.github.openfeign:feign-core:$openFeignVersion")
+    implementation("io.github.openfeign:feign-jackson:$openFeignVersion")
 }

+ 30 - 0
loader/src/main/java/com/gyee/edge/loader/ApplicationConsumerEventListener.java

@@ -0,0 +1,30 @@
+package com.gyee.edge.loader;
+
+import com.gyee.edge.loader.golden.DataWrite;
+import com.gyee.edge.loader.rocketmq.comuser.BalanceComuser;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class ApplicationConsumerEventListener implements
+        ApplicationListener<ApplicationReadyEvent> {
+
+    @Autowired
+    private DataWrite dataWrite;
+
+    @SneakyThrows
+    @Override
+    public void onApplicationEvent(ApplicationReadyEvent event) {
+        log.info("ApplicationEvent  rised!");
+        log.info("listener: " + event.toString());
+        BalanceComuser balanceComuser = new BalanceComuser();
+        balanceComuser.setWriteGolden(dataWrite);
+        balanceComuser.DataConsumer();
+    }
+
+}

+ 11 - 0
loader/src/main/java/com/gyee/edge/loader/data/Datas.java

@@ -0,0 +1,11 @@
+package com.gyee.edge.loader.data;
+
+import lombok.Data;
+
+
+@Data
+public class Datas {
+    private long ts;
+    private double value;
+
+}

+ 27 - 0
loader/src/main/java/com/gyee/edge/loader/data/GeneralTsData.java

@@ -0,0 +1,27 @@
+package com.gyee.edge.loader.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Optional;
+
+/**
+ * @author songwb<songwb@aliyun.com>
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class GeneralTsData implements TsData {
+
+    private long ts;
+    private short status;
+    private Optional<Double> doubleValue;
+    private Optional<Long> longValue;
+    private Optional<Boolean> booleanValue;
+    private Optional<String> stringValue;
+    private Optional<String> blobValue;
+
+
+}
+

+ 15 - 0
loader/src/main/java/com/gyee/edge/loader/data/TsData.java

@@ -0,0 +1,15 @@
+package com.gyee.edge.loader.data;
+
+/**
+ * @author songwb<songwb@aliyun.com>
+ */
+public interface TsData {
+
+    long getTs();
+
+    short getStatus();
+
+    //double getValue();
+
+}
+

+ 15 - 0
loader/src/main/java/com/gyee/edge/loader/data/TsDataType.java

@@ -0,0 +1,15 @@
+package com.gyee.edge.loader.data;
+
+/**
+ * @author songwb<songwb@aliyun.com>
+ */
+public enum TsDataType {
+    LONG,
+    DOUBLE,
+    BOOLEAN,
+    STRING,
+    BLOB,
+    COORDINATE
+
+}
+

+ 59 - 0
loader/src/main/java/com/gyee/edge/loader/data/TsPointData.java

@@ -0,0 +1,59 @@
+package com.gyee.edge.loader.data;
+
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * @author songwb<songwb@aliyun.com>
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class TsPointData implements Serializable {
+
+    private String tagName;
+    private GeneralTsData tsData;
+
+    public TsDataType findDataType() {
+//        if (tsData.getDoubleValue().isPresent()){
+//            return TsDataType.DOUBLE;
+//        }
+         if (tsData.getBooleanValue().isPresent()){
+            return TsDataType.BOOLEAN;
+        }
+        else if (tsData.getLongValue().isPresent()){
+            return TsDataType.LONG;
+        }
+
+        else if (tsData.getStringValue().isPresent()){
+            return TsDataType.STRING;
+        }
+
+        else if (tsData.getBlobValue().isPresent()){
+            return TsDataType.BLOB;
+        }
+
+
+        return TsDataType.DOUBLE;
+    }
+    /**
+     * 获取 double 类型值
+     * @return
+     */
+//    public Optional<Double> getValue() {
+//        if (tsData.getDoubleValue().isPresent()) {
+//            return tsData.getDoubleValue().get();
+//        } else if (tsData.getBooleanValue().isPresent()) {
+//            return tsData.getBooleanValue().get() ? 1.0 : 0.0;
+//        } else if (tsData.getLongValue().isPresent()) {
+//            return tsData.getLongValue().get();
+//        }
+//        return 0.0;
+//    }
+}
+

+ 13 - 0
loader/src/main/java/com/gyee/edge/loader/data/User.java

@@ -0,0 +1,13 @@
+package com.gyee.edge.loader.data;
+
+import lombok.Data;
+
+import java.util.List;
+import java.util.Map;
+
+
+@Data
+public class User {
+    private Map<String,Datas> map;
+
+}

+ 53 - 0
loader/src/main/java/com/gyee/edge/loader/golden/DataWrite.java

@@ -0,0 +1,53 @@
+package com.gyee.edge.loader.golden;
+
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.gyee.edge.loader.data.Datas;
+import com.gyee.edge.loader.data.GeneralTsData;
+import com.gyee.edge.loader.data.TsPointData;
+import com.gyee.edge.loader.data.User;
+import com.gyee.edge.loader.service.SqlService;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+
+@Slf4j
+@Component
+public class DataWrite {
+    @Autowired
+    private SqlService sqlService;
+
+    public void DataWriteGloden(String jsonObject) throws Exception {
+        List<TsPointData> tsDataList = new ArrayList<>();
+        try {
+            Date ts = new Date();
+            System.out.println("写入golden前时间"+(ts.getTime()));
+            //通过 proto Json 数据转 Java 对象
+            Gson GSON = new GsonBuilder().serializeNulls().create();
+            User user3 = GSON.fromJson(jsonObject, User.class);
+            Map<String, Datas> maps =  user3.getMap();
+            for (Map.Entry<String, Datas> entry:maps.entrySet()){
+                TsPointData tsPointData = new TsPointData();
+                GeneralTsData generalTsData = new GeneralTsData();
+                tsPointData.setTagName(entry.getKey());
+                generalTsData.setTs(entry.getValue().getTs());
+                double value = entry.getValue().getValue();
+                generalTsData.setDoubleValue(Optional.ofNullable(Double.valueOf(value)));
+                tsPointData.setTsData(generalTsData);
+                tsDataList.add(tsPointData);
+            }
+            sqlService.saveDataSnaps(tsDataList);
+            Date es = new Date();
+            System.out.println("写入golden后时间"+(ts.getTime()));
+            log.info("写入golden时间"+(es.getTime()-ts.getTime()));
+        }catch (Exception e){
+            log.error("转化map发生异常",e);
+        }
+    }
+
+}

File diff suppressed because it is too large
+ 1455 - 0
loader/src/main/java/com/gyee/edge/loader/protobuf/UserProto.java


+ 24 - 0
loader/src/main/java/com/gyee/edge/loader/protobuf/user.proto

@@ -0,0 +1,24 @@
+//使用 proto3 语法 ,未指定则使用proto2
+syntax = "proto3";
+
+// proto 文件包名
+package com.gyee.edge.gateway.protobuf;
+
+//生成 proto 文件所在包路径,一般来说是和文件包名一致就可以
+option java_package = "com.gyee.edge.gateway.protobuf";
+
+//生成 proto 的文件名
+option java_outer_classname="UserProto";
+
+
+//创建一个 User 对象
+message User{
+  // 定义简单的 Map string
+  map<string, Datas> map = 8;
+
+  message Datas {
+    int64 ts = 1;
+    double value = 2;
+  }
+}
+

+ 32 - 0
loader/src/main/java/com/gyee/edge/loader/service/RemoteServiceBuilder.java

@@ -0,0 +1,32 @@
+package com.gyee.edge.loader.service;
+
+
+import feign.Feign;
+import feign.Request;
+import feign.Retryer;
+import feign.jackson.JacksonDecoder;
+import feign.jackson.JacksonEncoder;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+@Configuration
+public class RemoteServiceBuilder {
+
+    @Value("${adapterUrl}")
+    String adapterUrl;
+
+    @Bean
+    public WriteDataService writeDataService() {
+        return Feign.builder()
+                .encoder(new JacksonEncoder())
+                .decoder(new JacksonDecoder())
+                .options(new Request.Options(1000, 3500))
+                .retryer(new Retryer.Default(5000, 5000, 3))
+                .target(WriteDataService.class, adapterUrl);
+    }
+
+
+
+}

+ 17 - 0
loader/src/main/java/com/gyee/edge/loader/service/SqlService.java

@@ -0,0 +1,17 @@
+package com.gyee.edge.loader.service;
+
+import com.gyee.edge.loader.data.TsPointData;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+@Service
+public class SqlService {
+    @Autowired
+    protected RemoteServiceBuilder remoteServiceBuilder;
+    public void saveDataSnaps(List<TsPointData> tsDataList) {
+
+        remoteServiceBuilder.writeDataService().saveTspointDatas(tsDataList);
+    }
+}

+ 22 - 0
loader/src/main/java/com/gyee/edge/loader/service/WriteDataService.java

@@ -0,0 +1,22 @@
+package com.gyee.edge.loader.service;
+
+
+import com.gyee.edge.loader.data.TsData;
+import com.gyee.edge.loader.data.TsPointData;
+import feign.Headers;
+import feign.Param;
+import feign.RequestLine;
+
+import java.util.List;
+import java.util.Map;
+
+public interface WriteDataService {
+
+    @Headers({"Content-Type: application/json","Accept: application/json"})
+    @RequestLine("POST /latest/batch")
+    void saveTspointDatas(List<TsPointData> tsDataList);
+
+
+    @RequestLine("GET /latest?keys={keys}")
+    Map<String, TsData> getData(@Param(value = "keys")String tagName);
+}

+ 7 - 0
loader/src/main/resources/application.yaml

@@ -0,0 +1,7 @@
+server:
+  port: 8080
+spring:
+  application:
+  name: loader
+
+adapterUrl: http://127.0.0.1:8011/ts

+ 7 - 0
loader/src/main/resources/banner.txt

@@ -0,0 +1,7 @@
+( \      (  ___  )(  ___  )(  __  \ (  ____ \(  ____ )
+| (      | (   ) || (   ) || (  \  )| (    \/| (    )|
+| |      | |   | || (___) || |   ) || (__    | (____)|
+| |      | |   | ||  ___  || |   | ||  __)   |     __)
+| |      | |   | || (   ) || |   ) || (      | (\ (   
+| (____/\| (___) || )   ( || (__/  )| (____/\| ) \ \__
+(_______/(_______)|/     \|(______/ (_______/|/   \__/