Browse Source

数据写入golden代码融进loader,读取dbpointmapper数据写进缓存

‘xugp 2 years ago
parent
commit
1eae14703e
31 changed files with 989 additions and 526 deletions
  1. 5 0
      common/utils/src/main/java/com/gyee/edge/common/utils/BytesUtils.java
  2. 0 1
      gateway/src/main/java/com/gyee/edge/gateway/ApplicationEventListener.java
  3. 4 3
      gateway/src/main/java/com/gyee/edge/gateway/bridge/rocketmq/producter/AsyncProducter.java
  4. 7 7
      gateway/src/main/java/com/gyee/edge/gateway/bridge/rocketmq/producter/Gyfp2Protocol.java
  5. 2 3
      gateway/src/main/resources/application.yaml
  6. 1 0
      loader/build.gradle
  7. 79 69
      loader/src/main/java/com/gyee/edge/loader/adapter/DataWrite.java
  8. 16 0
      loader/src/main/java/com/gyee/edge/loader/data/Coordinate.java
  9. 13 5
      loader/src/main/java/com/gyee/edge/loader/data/GeneralTsData.java
  10. 13 0
      loader/src/main/java/com/gyee/edge/loader/data/TsData.java
  11. 13 0
      loader/src/main/java/com/gyee/edge/loader/data/TsDataType.java
  12. 19 32
      loader/src/main/java/com/gyee/edge/loader/data/TsPointData.java
  13. 259 0
      loader/src/main/java/com/gyee/edge/loader/golden/GoldenLatestDao.java
  14. 26 0
      loader/src/main/java/com/gyee/edge/loader/golden/ILatestDao.java
  15. 62 0
      loader/src/main/java/com/gyee/edge/loader/golden/config/GoldenConfig.java
  16. 361 0
      loader/src/main/java/com/gyee/edge/loader/golden/config/GoldenConnectionPool.java
  17. 2 1
      loader/src/main/java/com/gyee/edge/loader/mapper/DBPointMapper.java
  18. 6 9
      loader/src/main/java/com/gyee/edge/loader/rocketmq/comuser/BalanceComuser.java
  19. 8 4
      loader/src/main/java/com/gyee/edge/loader/sqlite/ReadDBPointData.java
  20. 63 0
      loader/src/main/java/com/gyee/edge/loader/sqlite/cache/CacheService.java
  21. 0 364
      loader/src/main/java/com/gyee/edge/loader/sqlite/config/DB.java
  22. 0 17
      loader/src/main/java/com/gyee/edge/loader/sqlite/config/Database.java
  23. BIN
      loader/src/main/lib/commons-beanutils-1.8.3.jar
  24. BIN
      loader/src/main/lib/commons-logging-1.1.1.jar
  25. BIN
      loader/src/main/lib/golden-java-sdk-3.0.27.jar
  26. BIN
      loader/src/main/lib/protobuf-java-2.6.1.jar
  27. 12 0
      loader/src/main/resources/application.yaml
  28. BIN
      loader/src/main/resources/dbpointmapper.sqlite3
  29. 8 2
      protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/GYMessage.java
  30. 6 5
      protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/MessageData.java
  31. 4 4
      protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/MessageMutable.java

+ 5 - 0
common/utils/src/main/java/com/gyee/edge/common/utils/BytesUtils.java

@@ -1321,6 +1321,11 @@ public class BytesUtils {
 
 
     }
     }
 
 
+    public static int bytesToInts(byte[] arr,int t) {
+        int r = ( (arr[t] & 0x0F) << 16) + ((arr[1] & 0xFF) << 8) + (arr[2] & 0xFF);
+        return r;
+    }
+
     /**
     /**
 
 
      * 将字节数组转换为整型数值
      * 将字节数组转换为整型数值

+ 0 - 1
gateway/src/main/java/com/gyee/edge/gateway/ApplicationEventListener.java

@@ -1,7 +1,6 @@
 package com.gyee.edge.gateway;
 package com.gyee.edge.gateway;
 
 
 import com.gyee.edge.gateway.bridge.rocketmq.producter.AsyncProducter;
 import com.gyee.edge.gateway.bridge.rocketmq.producter.AsyncProducter;
-import com.gyee.edge.gateway.config.cache.CacheService;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;

+ 4 - 3
gateway/src/main/java/com/gyee/edge/gateway/bridge/rocketmq/producter/AsyncProducter.java

@@ -18,6 +18,9 @@ public class AsyncProducter {
     @Value("${rocketmq.namesrvaddr}")
     @Value("${rocketmq.namesrvaddr}")
     String nameSrvAddr;
     String nameSrvAddr;
 
 
+    @Value("${rocketmq.topic}")
+    String topic;
+
     @Value("${rocketmq.tags}")
     @Value("${rocketmq.tags}")
     String tags;
     String tags;
 
 
@@ -30,12 +33,10 @@ public class AsyncProducter {
         producer.setNamesrvAddr(nameSrvAddr);
         producer.setNamesrvAddr(nameSrvAddr);
         //启动producer实例
         //启动producer实例
         producer.start();
         producer.start();
-        //支持protobuf协议          byte[] bytes = ReadGolden.protoData();
         //支持GYFP2协议
         //支持GYFP2协议
         byte[] bytes = gyfp2Protocol.toBytes();
         byte[] bytes = gyfp2Protocol.toBytes();
             //创建消息
             //创建消息
-            Message message = new Message("keyMessage"/*Topic*/,
-                    tags/* Tag*/, "OrderID888", bytes);
+            Message message = new Message(topic/*Topic*/, tags,bytes);
             producer.send(message, new SendCallback() {
             producer.send(message, new SendCallback() {
                 @Override
                 @Override
                 public void onSuccess(SendResult sendResult) {
                 public void onSuccess(SendResult sendResult) {

+ 7 - 7
gateway/src/main/java/com/gyee/edge/gateway/bridge/rocketmq/producter/Gyfp2Protocol.java

@@ -18,10 +18,10 @@ public class Gyfp2Protocol {
         gyMessage.setAttribute(0);
         gyMessage.setAttribute(0);
 
 
         MessageMutable messageMutable = new MessageMutable();
         MessageMutable messageMutable = new MessageMutable();
-        messageMutable.setMessageType((byte) 11);
-        messageMutable.setKey(1002);
+        /*messageMutable.setMessageType((byte) 11);
+        messageMutable.setKey(4500000);
         messageMutable.setTs(new Date().getTime());
         messageMutable.setTs(new Date().getTime());
-        gyMessage.setMessageMutable(messageMutable);
+        gyMessage.setMessageMutable(messageMutable);*/
 
 
         List<MessageData> list = new ArrayList<>();
         List<MessageData> list = new ArrayList<>();
 //        for (int i = 0;i < 1000;i++){
 //        for (int i = 0;i < 1000;i++){
@@ -32,21 +32,21 @@ public class Gyfp2Protocol {
 //            messageData.setGYFP2_TYPE_FLOAT64(java.util.Optional.of(d));
 //            messageData.setGYFP2_TYPE_FLOAT64(java.util.Optional.of(d));
 //            list.add(messageData);
 //            list.add(messageData);
 //        }
 //        }
-
+        double d = 1 + Math.random() * (80 - 1);
         MessageData messageData = new MessageData();
         MessageData messageData = new MessageData();
         messageData.setDataKey(4500000);
         messageData.setDataKey(4500000);
         messageData.setDataType((byte) 0);
         messageData.setDataType((byte) 0);
         messageData.setTs(new Date().getTime());
         messageData.setTs(new Date().getTime());
-        messageData.setGYFP2_TYPE_BOOLEAN(java.util.Optional.of(false));
+        messageData.setGYFP2_TYPE_BOOLEAN(java.util.Optional.of(true));
         list.add(messageData);
         list.add(messageData);
 
 
         MessageData messageData1 = new MessageData();
         MessageData messageData1 = new MessageData();
-        messageData1.setDataKey(4500000);
+        messageData1.setDataKey(4500001);
         messageData1.setDataType((byte) 11);
         messageData1.setDataType((byte) 11);
         messageData1.setTs(new Date().getTime());
         messageData1.setTs(new Date().getTime());
-        double d = 1 + Math.random() * (80 - 1);
         messageData1.setGYFP2_TYPE_FLOAT64(java.util.Optional.of(d));
         messageData1.setGYFP2_TYPE_FLOAT64(java.util.Optional.of(d));
         list.add(messageData1);
         list.add(messageData1);
+
         gyMessage.setMessageDataList(list);
         gyMessage.setMessageDataList(list);
         Date sTdate = new Date();
         Date sTdate = new Date();
         byte[] bytes = gyMessage.toBytes();
         byte[] bytes = gyMessage.toBytes();

+ 2 - 3
gateway/src/main/resources/application.yaml

@@ -12,6 +12,5 @@ spring:
 
 
 rocketmq:
 rocketmq:
   namesrvaddr: 127.0.0.1:9876
   namesrvaddr: 127.0.0.1:9876
-  tags: MHS
-filePath: D:\\data.csv
-
+  topic: UPWARD_MHS_
+  tags: DJL_EP01

+ 1 - 0
loader/build.gradle

@@ -27,4 +27,5 @@ dependencies {
     implementation("io.github.openfeign:feign-core:$openFeignVersion")
     implementation("io.github.openfeign:feign-core:$openFeignVersion")
     implementation("io.github.openfeign:feign-jackson:$openFeignVersion")
     implementation("io.github.openfeign:feign-jackson:$openFeignVersion")
     implementation("org.xerial:sqlite-jdbc:$sqliteJdbc")
     implementation("org.xerial:sqlite-jdbc:$sqliteJdbc")
+    implementation fileTree(dir: 'src/main/lib', include: '*.jar')
 }
 }

+ 79 - 69
loader/src/main/java/com/gyee/edge/loader/adapter/DataWrite.java

@@ -3,11 +3,9 @@ package com.gyee.edge.loader.adapter;
 
 
 import com.gyee.edge.loader.data.GeneralTsData;
 import com.gyee.edge.loader.data.GeneralTsData;
 import com.gyee.edge.loader.data.TsPointData;
 import com.gyee.edge.loader.data.TsPointData;
+import com.gyee.edge.loader.golden.ILatestDao;
 import com.gyee.edge.loader.mapper.DBPointMapper;
 import com.gyee.edge.loader.mapper.DBPointMapper;
-
-import com.gyee.edge.loader.service.SqlService;
-
-import com.gyee.edge.loader.sqlite.ReadDBPointData;
+import com.gyee.edge.loader.sqlite.cache.CacheService;
 import com.gyee.protocol.gyfp2.message.GYMessage;
 import com.gyee.protocol.gyfp2.message.GYMessage;
 import com.gyee.protocol.gyfp2.message.MessageData;
 import com.gyee.protocol.gyfp2.message.MessageData;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
@@ -20,19 +18,20 @@ import java.util.*;
 /**
 /**
  * @author :
  * @author :
  * @date :Created in 2022/8/25 17:34
  * @date :Created in 2022/8/25 17:34
- * @description:  数据
+ * @description:  数据写入
  */
  */
 
 
 @Slf4j
 @Slf4j
 @Component
 @Component
 public class DataWrite {
 public class DataWrite {
     @Autowired
     @Autowired
-    private SqlService sqlService;
+    private ILatestDao iLatestDao;
     
     
     @Autowired
     @Autowired
-    private ReadDBPointData readDBPointData;
+    private CacheService cacheService;
 
 
-    public void DataWriteGloden(GYMessage gyMessage) throws Exception {
+
+    public void DataWriteGloden(GYMessage gyMessage,String topic) throws Exception {
         List<TsPointData> tsDataList = new ArrayList<>();
         List<TsPointData> tsDataList = new ArrayList<>();
         List<MessageData> messageDataList = gyMessage.getMessageDataList();
         List<MessageData> messageDataList = gyMessage.getMessageDataList();
         try {
         try {
@@ -40,18 +39,18 @@ public class DataWrite {
                 for(MessageData messageData:messageDataList){
                 for(MessageData messageData:messageDataList){
                     TsPointData tsPointData = new TsPointData();
                     TsPointData tsPointData = new TsPointData();
                     GeneralTsData generalTsData = new GeneralTsData();
                     GeneralTsData generalTsData = new GeneralTsData();
-                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(messageData.getDataKey());
-                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
-                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                    String key = getPoint_key(messageData.getDataKey(),topic);
+                    if(key != null && key.length()>0) {
+                        tsPointData.setTagName(key);
                         generalTsData.setTs(messageData.getTs());
                         generalTsData.setTs(messageData.getTs());
                         if (messageData.getDataType() == 0) {
                         if (messageData.getDataType() == 0) {
-                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN());
                         } else if (messageData.getDataType() == 10) {
                         } else if (messageData.getDataType() == 10) {
-                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32());
                         } else if (messageData.getDataType() == 8) {
                         } else if (messageData.getDataType() == 8) {
-                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64());
                         } else if (messageData.getDataType() == 11) {
                         } else if (messageData.getDataType() == 11) {
-                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64());
                         }
                         }
                         generalTsData.setStatus((short) 1);
                         generalTsData.setStatus((short) 1);
                         tsPointData.setTsData(generalTsData);
                         tsPointData.setTsData(generalTsData);
@@ -62,19 +61,19 @@ public class DataWrite {
                 for(int i = 0; i < gyMessage.getMessageDataList().size();i++){
                 for(int i = 0; i < gyMessage.getMessageDataList().size();i++){
                     TsPointData tsPointData = new TsPointData();
                     TsPointData tsPointData = new TsPointData();
                     GeneralTsData generalTsData = new GeneralTsData();
                     GeneralTsData generalTsData = new GeneralTsData();
-                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(gyMessage.getMessageMutable().getKey() + i);
-                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
+                    String key = getPoint_key(gyMessage.getMessageMutable().getKey() + i,topic);
+                    if(key != null && key.length()>0) {
                         MessageData messageData = gyMessage.getMessageDataList().get(i);
                         MessageData messageData = gyMessage.getMessageDataList().get(i);
-                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        tsPointData.setTagName(key);
                         generalTsData.setTs(messageData.getTs());
                         generalTsData.setTs(messageData.getTs());
                         if (messageData.getDataType() == 0) {
                         if (messageData.getDataType() == 0) {
-                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN());
                         } else if (messageData.getDataType() == 10) {
                         } else if (messageData.getDataType() == 10) {
-                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32());
                         } else if (messageData.getDataType() == 8) {
                         } else if (messageData.getDataType() == 8) {
-                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64());
                         } else if (messageData.getDataType() == 11) {
                         } else if (messageData.getDataType() == 11) {
-                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64());
                         }
                         }
                         generalTsData.setStatus((short) 1);
                         generalTsData.setStatus((short) 1);
                         tsPointData.setTsData(generalTsData);
                         tsPointData.setTsData(generalTsData);
@@ -86,18 +85,18 @@ public class DataWrite {
                 for(MessageData messageData:messageDataList){
                 for(MessageData messageData:messageDataList){
                     TsPointData tsPointData = new TsPointData();
                     TsPointData tsPointData = new TsPointData();
                     GeneralTsData generalTsData = new GeneralTsData();
                     GeneralTsData generalTsData = new GeneralTsData();
-                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(messageData.getDataKey());
-                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
-                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                    String key = getPoint_key(messageData.getDataKey(),topic);
+                    if(key != null && key.length()>0) {
+                        tsPointData.setTagName(key);
                         generalTsData.setTs(messageData.getTs());
                         generalTsData.setTs(messageData.getTs());
                         if (messageType == 0) {
                         if (messageType == 0) {
-                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN());
                         } else if (messageType == 10) {
                         } else if (messageType == 10) {
-                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32());
                         } else if (messageType == 8) {
                         } else if (messageType == 8) {
-                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64());
                         } else if (messageType == 11) {
                         } else if (messageType == 11) {
-                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64());
                         }
                         }
                         generalTsData.setStatus((short) 1);
                         generalTsData.setStatus((short) 1);
                         tsPointData.setTsData(generalTsData);
                         tsPointData.setTsData(generalTsData);
@@ -109,19 +108,19 @@ public class DataWrite {
                 for(int i = 0; i < gyMessage.getMessageDataList().size();i++){
                 for(int i = 0; i < gyMessage.getMessageDataList().size();i++){
                     TsPointData tsPointData = new TsPointData();
                     TsPointData tsPointData = new TsPointData();
                     GeneralTsData generalTsData = new GeneralTsData();
                     GeneralTsData generalTsData = new GeneralTsData();
-                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(gyMessage.getMessageMutable().getKey()+ i);
-                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
+                    String key = getPoint_key(gyMessage.getMessageMutable().getKey()+ i,topic);
+                    if(key != null && key.length()>0) {
                         MessageData messageData = gyMessage.getMessageDataList().get(i);
                         MessageData messageData = gyMessage.getMessageDataList().get(i);
-                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        tsPointData.setTagName(key);
                         generalTsData.setTs(messageData.getTs());
                         generalTsData.setTs(messageData.getTs());
                         if (messageType == 0) {
                         if (messageType == 0) {
-                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN());
                         } else if (messageType == 10) {
                         } else if (messageType == 10) {
-                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32());
                         } else if (messageType == 8) {
                         } else if (messageType == 8) {
-                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64());
                         } else if (messageType == 11) {
                         } else if (messageType == 11) {
-                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64());
                         }
                         }
                         generalTsData.setStatus((short) 1);
                         generalTsData.setStatus((short) 1);
                         tsPointData.setTsData(generalTsData);
                         tsPointData.setTsData(generalTsData);
@@ -133,18 +132,18 @@ public class DataWrite {
                 for(MessageData messageData:messageDataList){
                 for(MessageData messageData:messageDataList){
                     TsPointData tsPointData = new TsPointData();
                     TsPointData tsPointData = new TsPointData();
                     GeneralTsData generalTsData = new GeneralTsData();
                     GeneralTsData generalTsData = new GeneralTsData();
-                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(messageData.getDataKey());
-                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
-                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                    String key = getPoint_key(messageData.getDataKey(),topic);
+                    if(key != null && key.length()>0) {
+                        tsPointData.setTagName(key);
                         generalTsData.setTs(ts);
                         generalTsData.setTs(ts);
                         if (messageData.getDataType() == 0) {
                         if (messageData.getDataType() == 0) {
-                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN());
                         } else if (messageData.getDataType() == 10) {
                         } else if (messageData.getDataType() == 10) {
-                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32());
                         } else if (messageData.getDataType() == 8) {
                         } else if (messageData.getDataType() == 8) {
-                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64());
                         } else if (messageData.getDataType() == 11) {
                         } else if (messageData.getDataType() == 11) {
-                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64());
                         }
                         }
                         generalTsData.setStatus((short) 1);
                         generalTsData.setStatus((short) 1);
                         tsPointData.setTsData(generalTsData);
                         tsPointData.setTsData(generalTsData);
@@ -156,19 +155,19 @@ public class DataWrite {
                 for(int i = 0; i < gyMessage.getMessageDataList().size();i++){
                 for(int i = 0; i < gyMessage.getMessageDataList().size();i++){
                     TsPointData tsPointData = new TsPointData();
                     TsPointData tsPointData = new TsPointData();
                     GeneralTsData generalTsData = new GeneralTsData();
                     GeneralTsData generalTsData = new GeneralTsData();
-                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(gyMessage.getMessageMutable().getKey() + i);
-                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
+                    String key = getPoint_key(gyMessage.getMessageMutable().getKey() + i,topic);
+                    if(key != null && key.length()>0) {
                         MessageData messageData = gyMessage.getMessageDataList().get(i);
                         MessageData messageData = gyMessage.getMessageDataList().get(i);
-                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                        tsPointData.setTagName(key);
                         generalTsData.setTs(ts);
                         generalTsData.setTs(ts);
                         if (messageData.getDataType() == 0) {
                         if (messageData.getDataType() == 0) {
-                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN());
                         } else if (messageData.getDataType() == 10) {
                         } else if (messageData.getDataType() == 10) {
-                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32());
                         } else if (messageData.getDataType() == 8) {
                         } else if (messageData.getDataType() == 8) {
-                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64());
                         } else if (messageData.getDataType() == 11) {
                         } else if (messageData.getDataType() == 11) {
-                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64());
                         }
                         }
                         generalTsData.setStatus((short) 1);
                         generalTsData.setStatus((short) 1);
                         tsPointData.setTsData(generalTsData);
                         tsPointData.setTsData(generalTsData);
@@ -181,18 +180,18 @@ public class DataWrite {
                 for(MessageData messageData:messageDataList){
                 for(MessageData messageData:messageDataList){
                     TsPointData tsPointData = new TsPointData();
                     TsPointData tsPointData = new TsPointData();
                     GeneralTsData generalTsData = new GeneralTsData();
                     GeneralTsData generalTsData = new GeneralTsData();
-                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(messageData.getDataKey());
-                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0) {
-                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                    String key = getPoint_key(messageData.getDataKey(),topic);
+                    if(key != null && key.length()>0) {
+                        tsPointData.setTagName(key);
                         generalTsData.setTs(ts);
                         generalTsData.setTs(ts);
                         if (messageType == 0) {
                         if (messageType == 0) {
-                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN());
                         } else if (messageType == 10) {
                         } else if (messageType == 10) {
-                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32());
                         } else if (messageType == 8) {
                         } else if (messageType == 8) {
-                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64());
                         } else if (messageType == 11) {
                         } else if (messageType == 11) {
-                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64());
                         }
                         }
                         generalTsData.setStatus((short) 1);
                         generalTsData.setStatus((short) 1);
                         tsPointData.setTsData(generalTsData);
                         tsPointData.setTsData(generalTsData);
@@ -206,18 +205,18 @@ public class DataWrite {
                     TsPointData tsPointData = new TsPointData();
                     TsPointData tsPointData = new TsPointData();
                     GeneralTsData generalTsData = new GeneralTsData();
                     GeneralTsData generalTsData = new GeneralTsData();
                     MessageData messageData = gyMessage.getMessageDataList().get(i);
                     MessageData messageData = gyMessage.getMessageDataList().get(i);
-                    DBPointMapper dbPointMapper = readDBPointData.selectBykey(gyMessage.getMessageMutable().getKey() + i);
-                    if(dbPointMapper.getPoint_key() != null && dbPointMapper.getPoint_key().length()>0){
-                        tsPointData.setTagName(dbPointMapper.getPoint_key());
+                    String key = getPoint_key(gyMessage.getMessageMutable().getKey() + i,topic);
+                    if(key != null && key.length()>0){
+                        tsPointData.setTagName(key);
                         generalTsData.setTs(ts);
                         generalTsData.setTs(ts);
                         if(messageType == 0){
                         if(messageType == 0){
-                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN().get());
+                            generalTsData.setBooleanValue(messageData.getGYFP2_TYPE_BOOLEAN());
                         }else if(messageType == 10){
                         }else if(messageType == 10){
-                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32().get());
+                            generalTsData.setFloatValue(messageData.getGYFP2_TYPE_FLOAT32());
                         }else if(messageType == 8){
                         }else if(messageType == 8){
-                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64().get());
+                            generalTsData.setLongValue(messageData.getGYFP2_TYPE_INT64());
                         }else if(messageType == 11){
                         }else if(messageType == 11){
-                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64().get());
+                            generalTsData.setDoubleValue(messageData.getGYFP2_TYPE_FLOAT64());
                         }
                         }
                         generalTsData.setStatus((short) 1);
                         generalTsData.setStatus((short) 1);
                         tsPointData.setTsData(generalTsData);
                         tsPointData.setTsData(generalTsData);
@@ -225,12 +224,23 @@ public class DataWrite {
                     }
                     }
                 }
                 }
             }
             }
-            Date sTdate = new Date();
-            sqlService.saveDataSnaps(tsDataList);
-            Date enDate = new Date();
-            log.info("写入golden时间"+(enDate.getTime()-sTdate.getTime()));
+            iLatestDao.writeLatest(tsDataList);
         }catch (Exception e){
         }catch (Exception e){
-            log.error("datawrite发生异常",e);
+            log.error("datawrite error",e);
+        }
+    }
+
+    public String getPoint_key(int datakey,String topic) {
+        Map<String, DBPointMapper> map = cacheService.getMap();
+        if (map.isEmpty()) {
+            return null;
+        } else {
+            for (Map.Entry<String, DBPointMapper> entry : map.entrySet()) {
+                if (entry.getValue().getGyfp2_id().equals(topic) && entry.getValue().getGyfp2_address() == datakey) {
+                    return entry.getKey();
+                }
+            }
+            return null;
         }
         }
     }
     }
 
 

+ 16 - 0
loader/src/main/java/com/gyee/edge/loader/data/Coordinate.java

@@ -0,0 +1,16 @@
+package com.gyee.edge.loader.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+
+@Data
+@AllArgsConstructor
+public class Coordinate {
+
+    private double latitude;
+
+    private double longitude;
+
+}
+

+ 13 - 5
loader/src/main/java/com/gyee/edge/loader/data/GeneralTsData.java

@@ -1,19 +1,27 @@
 package com.gyee.edge.loader.data;
 package com.gyee.edge.loader.data;
 
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 
 import java.io.Serializable;
 import java.io.Serializable;
+import java.util.Optional;
 
 
 
 
 @Data
 @Data
-public class GeneralTsData implements Serializable {
+@AllArgsConstructor
+@NoArgsConstructor
+public class GeneralTsData implements TsData {
 
 
     private long ts;
     private long ts;
     private short status;
     private short status;
-    private Double doubleValue;
-    private Long longValue;
-    private Boolean booleanValue;
-    private Float floatValue;
+    private Optional<Double> doubleValue;
+    private Optional<Long> longValue;
+    private Optional<Boolean> booleanValue;
+    private Optional<String> stringValue;
+    private Optional<String> blobValue;
+    private Optional<Coordinate> coordinateValue;
+    private Optional<Float> floatValue;
 
 
 
 
 
 

+ 13 - 0
loader/src/main/java/com/gyee/edge/loader/data/TsData.java

@@ -0,0 +1,13 @@
+package com.gyee.edge.loader.data;
+
+
+public interface TsData {
+
+    long getTs();
+
+    short getStatus();
+
+    //double getValue();
+
+}
+

+ 13 - 0
loader/src/main/java/com/gyee/edge/loader/data/TsDataType.java

@@ -0,0 +1,13 @@
+package com.gyee.edge.loader.data;
+
+public enum TsDataType {
+    LONG,
+    DOUBLE,
+    BOOLEAN,
+    STRING,
+    BLOB,
+    COORDINATE,
+    FLOAT
+
+}
+

+ 19 - 32
loader/src/main/java/com/gyee/edge/loader/data/TsPointData.java

@@ -2,53 +2,40 @@ package com.gyee.edge.loader.data;
 
 
 
 
 
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 
 
 
-import java.io.Serializable;
-
 
 
 @Data
 @Data
-public class TsPointData implements Serializable {
+@AllArgsConstructor
+@NoArgsConstructor
+public class TsPointData  {
 
 
     private String tagName;
     private String tagName;
     private GeneralTsData tsData;
     private GeneralTsData tsData;
 
 
-    /*public TsDataType findDataType() {
-        if (tsData.getDoubleValue().isPresent()){
+    public TsDataType findDataType() {
+        if (tsData.getDoubleValue() != null) {
             return TsDataType.DOUBLE;
             return TsDataType.DOUBLE;
-        }
-         if (tsData.getBooleanValue().isPresent()){
+        } else if (tsData.getBooleanValue() != null) {
             return TsDataType.BOOLEAN;
             return TsDataType.BOOLEAN;
-        }
-        else if (tsData.getLongValue().isPresent()){
+        } else if (tsData.getLongValue() != null) {
             return TsDataType.LONG;
             return TsDataType.LONG;
-        }
-
-        else if (tsData.getStringValue().isPresent()){
+        } else if (tsData.getStringValue() != null) {
             return TsDataType.STRING;
             return TsDataType.STRING;
-        }
-
-        else if (tsData.getBlobValue().isPresent()){
+        } else if (tsData.getBlobValue() != null) {
             return TsDataType.BLOB;
             return TsDataType.BLOB;
-        }
+        } else if (tsData.getCoordinateValue() != null) {
+            return TsDataType.COORDINATE;
+        } else if (tsData.getFloatValue() != null) {
+            return TsDataType.FLOAT;
+        } else {
+            return TsDataType.DOUBLE;
 
 
+        }
 
 
-        return TsDataType.DOUBLE;
-    }*/
-    /**
-     * 获取 double 类型值
-     * @return
-     */
-//    public Optional<Double> getValue() {
-//        if (tsData.getDoubleValue().isPresent()) {
-//            return tsData.getDoubleValue().get();
-//        } else if (tsData.getBooleanValue().isPresent()) {
-//            return tsData.getBooleanValue().get() ? 1.0 : 0.0;
-//        } else if (tsData.getLongValue().isPresent()) {
-//            return tsData.getLongValue().get();
-//        }
-//        return 0.0;
-//    }
+    }
 }
 }
 
 

+ 259 - 0
loader/src/main/java/com/gyee/edge/loader/golden/GoldenLatestDao.java

@@ -0,0 +1,259 @@
+package com.gyee.edge.loader.golden;
+
+import com.gyee.edge.loader.data.TsDataType;
+import com.gyee.edge.loader.data.TsPointData;
+import com.gyee.edge.loader.golden.config.GoldenConfig;
+import com.rtdb.enums.Quality;
+import com.rtdb.model.*;
+import com.rtdb.service.impl.BaseImpl;
+import com.rtdb.service.impl.ServerImpl;
+import com.rtdb.service.impl.SnapshotImpl;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Component
+public class GoldenLatestDao implements ILatestDao {
+
+    @Autowired
+    private GoldenConfig goldenConfig;
+
+
+
+    public int writeDoubleLatest(List<TsPointData> list) throws Exception {
+        ServerImpl connection = null;
+        try {
+            connection = goldenConfig.getGoldenConnectionPool().getConnection();
+            SnapshotImpl snap = new SnapshotImpl(connection);
+            BaseImpl base = new BaseImpl(connection);
+            String[] tagNames = list.stream().map(TsPointData::getTagName).toArray(String[]::new);
+            List<MinPoint> pointList = base.findPoints(tagNames);
+            List<DoubleData> doubleList = new ArrayList<>();
+            for (int i = 0; i < list.size(); i++) {
+                Date time = new Date();
+                time.setTime(list.get(0).getTsData().getTs());
+                DoubleData data = new DoubleData(pointList.get(i).getId(), time, list.get(i).getTsData().getDoubleValue().get()
+                        , (short) 0, 0);
+                doubleList.add(data);
+            }
+            int count = snap.putDoubleSnapshots(doubleList);
+            return count;
+        } finally {
+            if (connection != null)
+                goldenConfig.getGoldenConnectionPool().returnConnection(connection);
+        }
+    }
+
+    @Override
+    public int writeStringLatest(List<TsPointData> list) throws Exception {
+        ServerImpl connection = null;
+        try {
+            connection = goldenConfig.getGoldenConnectionPool().getConnection();
+            SnapshotImpl snap = new SnapshotImpl(connection);
+            BaseImpl base = new BaseImpl(connection);
+            String[] tagNames = list.stream().map(TsPointData::getTagName).toArray(String[]::new);
+            List<MinPoint> pointList = base.findPoints(tagNames);
+            List<BlobData> blobList = new ArrayList<>();
+            for (int i = 0; i < list.size(); i++) {
+                BlobData pointData = new BlobData();
+                Date date = new Date();
+                date.setTime(list.get(i).getTsData().getTs());
+                pointData.setDatetime((date));
+                pointData.setId(pointList.get(i).getId());
+                pointData.setQuality(Quality.GOOD);
+                pointData.setBlob(list.get(i).getTsData().getBlobValue().get().getBytes());
+                blobList.add(pointData);
+            }
+            int count = snap.putBlobSnapshots(blobList);
+            return count;
+        } finally {
+            if (connection != null)
+                goldenConfig.getGoldenConnectionPool().returnConnection(connection);
+        }
+
+    }
+
+    @Override
+    public int writeBooleanLatest(List<TsPointData> list) throws Exception {
+        ServerImpl connection = null;
+        try {
+            connection = goldenConfig.getGoldenConnectionPool().getConnection();
+            SnapshotImpl snap = new SnapshotImpl(connection);
+            BaseImpl base = new BaseImpl(connection);
+            String[] tagNames = list.stream().map(TsPointData::getTagName).toArray(String[]::new);
+            List<MinPoint> pointList = base.findPoints(tagNames);
+            List<IntData> intList = new ArrayList<>();
+            for (int i = 0; i < list.size(); i++) {
+                IntData pointData = new IntData();
+                Date date = new Date();
+                date.setTime(list.get(i).getTsData().getTs());
+                pointData.setDateTime(date);
+                pointData.setId(pointList.get(i).getId());
+                pointData.setQuality((short) 0);
+                pointData.setValue(list.get(i).getTsData().getBooleanValue().get() ? 1 : 0);
+                intList.add(pointData);
+            }
+            int count = snap.putIntSnapshots(intList);
+            return count;
+        } finally {
+            if (connection != null)
+                goldenConfig.getGoldenConnectionPool().returnConnection(connection);
+        }
+    }
+
+    @Override
+    public int writeLongLatest(List<TsPointData> list) throws Exception {
+      
+        ServerImpl connection = null;
+        try {
+            connection = goldenConfig.getGoldenConnectionPool().getConnection();
+            SnapshotImpl snap = new SnapshotImpl(connection);
+            BaseImpl base = new BaseImpl(connection);
+            String[] tagNames = list.stream().map(TsPointData::getTagName).toArray(String[]::new);
+            List<MinPoint> pointList = base.findPoints(tagNames);
+            List<DatetimeData> datetimeList = new ArrayList<>();
+            for (int i = 0; i < list.size(); i++) {
+
+                DatetimeData pointData = new DatetimeData();
+                Date date = new Date();
+                date.setTime(list.get(i).getTsData().getTs());
+                pointData.setDateTime(date);
+                pointData.setId(pointList.get(i).getId());
+                pointData.setQuality((short) 0);
+                date.setTime(list.get(i).getTsData().getLongValue().get());
+                pointData.setValue(date);
+                datetimeList.add(pointData);
+            }
+            int count = snap.putDatetimeSnapshots(datetimeList).getSucCount();
+            return count;
+        } finally {
+            if (connection != null)
+                goldenConfig.getGoldenConnectionPool().returnConnection(connection);
+        }
+    }
+
+    @Override
+    public int writeFloatLatest(List<TsPointData> list) throws Exception {
+        ServerImpl connection = null;
+        try {
+            connection = goldenConfig.getGoldenConnectionPool().getConnection();
+            SnapshotImpl snap = new SnapshotImpl(connection);
+            BaseImpl base = new BaseImpl(connection);
+            String[] tagNames = list.stream().map(TsPointData::getTagName).toArray(String[]::new);
+            List<MinPoint> pointList = base.findPoints(tagNames);
+            List<DoubleData> doubleList = new ArrayList<>();
+            for (int i = 0; i < list.size(); i++) {
+                Date time = new Date();
+                time.setTime(list.get(0).getTsData().getTs());
+                DoubleData data = new DoubleData(pointList.get(i).getId(), time, list.get(i).getTsData().getFloatValue().get()
+                        , (short) 0, 0);
+                doubleList.add(data);
+            }
+            int count = snap.putDoubleSnapshots(doubleList);
+            return count;
+        } finally {
+            if (connection != null)
+                goldenConfig.getGoldenConnectionPool().returnConnection(connection);
+        }
+    }
+
+    @Override
+    public int writeBlobLatest(List<TsPointData> list) throws Exception {
+        ServerImpl connection = null;
+        try {
+            connection = goldenConfig.getGoldenConnectionPool().getConnection();
+            SnapshotImpl snap = new SnapshotImpl(connection);
+            BaseImpl base = new BaseImpl(connection);
+            String[] tagNames = list.stream().map(TsPointData::getTagName).toArray(String[]::new);
+            List<MinPoint> pointList = base.findPoints(tagNames);
+            List<BlobData> dataList = new ArrayList<>();
+            for (int i = 0; i < list.size(); i++) {
+
+                BlobData pointData = new BlobData();
+                Date date = new Date();
+                date.setTime(list.get(i).getTsData().getTs());
+                pointData.setDatetime((date));
+                pointData.setId(pointList.get(i).getId());
+                pointData.setQuality(Quality.GOOD);
+                pointData.setBlob(list.get(i).getTsData().getBlobValue().get().getBytes());
+                pointData.setLen(list.get(i).getTsData().getBlobValue().get().getBytes().length);
+                dataList.add(pointData);
+            }
+            int count = snap.putBlobSnapshots(dataList);
+            return count;
+
+        } finally {
+            if (connection != null)
+                goldenConfig.getGoldenConnectionPool().returnConnection(connection);
+        }
+    }
+
+    @Override
+    public int writeCoordinateLatest(List<TsPointData> list) throws Exception {
+        ServerImpl connection = null;
+        try {
+            connection = goldenConfig.getGoldenConnectionPool().getConnection();
+            SnapshotImpl snap = new SnapshotImpl(connection);
+            BaseImpl base = new BaseImpl(connection);
+            String[] tagNames = list.stream().map(TsPointData::getTagName).toArray(String[]::new);
+            List<MinPoint> pointList = base.findPoints(tagNames);
+            List<CoorData> dataList = new ArrayList<>();
+            for (int i = 0; i < list.size(); i++) {
+
+                CoorData pointData = new CoorData();
+                Date date = new Date();
+                date.setTime(list.get(i).getTsData().getTs());
+                pointData.setDateTime((date));
+                pointData.setId(pointList.get(i).getId());
+                pointData.setQuality((short) 0);
+                pointData.setX((float) list.get(i).getTsData().getCoordinateValue().get().getLongitude());
+                pointData.setY((float) list.get(i).getTsData().getCoordinateValue().get().getLatitude());
+                dataList.add(pointData);
+            }
+            int count = snap.putCoorSnapshots(dataList);
+            return count;
+
+        } finally {
+            if (connection != null)
+                goldenConfig.getGoldenConnectionPool().returnConnection(connection);
+        }
+    }
+
+    @Override
+    public boolean writeLatest(List<TsPointData> dataList) throws Exception {
+        int writeCount = 0;
+        Map<TsDataType, List<TsPointData>> pointGroup = dataList.stream().collect(Collectors.groupingBy(TsPointData::findDataType));
+        for (Map.Entry<TsDataType, List<TsPointData>> entry : pointGroup.entrySet()) {
+            List<TsPointData> pointDataList = entry.getValue();
+            if (entry.getKey() == TsDataType.DOUBLE) {
+                int count = writeDoubleLatest(pointDataList);
+                writeCount = writeCount + count;
+            } else if (entry.getKey() == TsDataType.BOOLEAN) {
+                int count = writeBooleanLatest(pointDataList);
+                writeCount = writeCount + count;
+            } else if (entry.getKey() == TsDataType.STRING) {
+                int count = writeStringLatest(pointDataList);
+                writeCount = writeCount + count;
+            } else if (entry.getKey() == TsDataType.BLOB) {
+                int count = writeBlobLatest(pointDataList);
+                writeCount = writeCount + count;
+            } else if (entry.getKey() == TsDataType.COORDINATE) {
+                int count = writeCoordinateLatest(pointDataList);
+                writeCount = writeCount + count;
+            } else if (entry.getKey() == TsDataType.LONG) {
+                int count = writeLongLatest(pointDataList);
+                writeCount = writeCount + count;
+            } else if (entry.getKey() == TsDataType.FLOAT) {
+                int count = writeFloatLatest(pointDataList);
+                writeCount = writeCount + count;
+            }
+        }
+        return writeCount > 0 ? true : false;
+
+    }
+
+
+
+}

+ 26 - 0
loader/src/main/java/com/gyee/edge/loader/golden/ILatestDao.java

@@ -0,0 +1,26 @@
+package com.gyee.edge.loader.golden;
+
+
+import com.gyee.edge.loader.data.TsPointData;
+
+import java.util.List;
+
+public interface ILatestDao {
+
+    int writeDoubleLatest(List<TsPointData> list) throws Exception;
+
+    int writeStringLatest(List<TsPointData> list) throws Exception;
+
+    int writeBooleanLatest(List<TsPointData> list) throws Exception;
+
+    int writeLongLatest(List<TsPointData> list) throws Exception;
+
+    int writeBlobLatest(List<TsPointData> list) throws Exception;
+
+    int writeCoordinateLatest(List<TsPointData> list) throws Exception;
+
+    int writeFloatLatest(List<TsPointData> list) throws Exception;
+
+    boolean writeLatest(List<TsPointData> dataList) throws Exception;
+
+}

+ 62 - 0
loader/src/main/java/com/gyee/edge/loader/golden/config/GoldenConfig.java

@@ -0,0 +1,62 @@
+package com.gyee.edge.loader.golden.config;
+
+
+import com.rtdb.service.impl.ServerImpl;
+import com.rtdb.service.impl.ServerImplPool;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+//@PropertySource("classpath:/golden.properties")
+public class GoldenConfig {
+
+    @Value("${golden.server_ip:127.0.0.1}")
+    private String serverIp;
+
+    @Value("${golden.server_port:6327}")
+    private int serverPort;
+
+    @Value("${golden.user_name:sa}")
+    private String userName;
+
+    @Value("${golden.password:golden}")
+    private String password;
+
+    @Value("${golden.pool_size:10}")
+    private int poolSize;
+
+    @Value("${golden.max_pool_size:100}")
+    private int maxPoolSize;
+
+    @Value("${golden.query_history_limit:100000}")
+    private int queryHistoryLimit;
+
+    private ServerImplPool pool;
+
+    private GoldenConnectionPool connectionPool;
+
+    public ServerImplPool getServerImplPool() {
+        if (pool == null) {
+            pool = new ServerImplPool(serverIp, serverPort, userName,password, poolSize, maxPoolSize);
+        }
+        return pool;
+    }
+
+    public GoldenConnectionPool getGoldenConnectionPool() {
+        if (connectionPool == null) {
+            connectionPool = new GoldenConnectionPool();
+            connectionPool.init(serverIp, (short)serverPort, userName,password, poolSize,1, maxPoolSize);
+            System.out.println("golden server ip = " +  serverIp);
+        }
+        return connectionPool;
+    }
+
+    public ServerImpl getServerImpl() throws Exception {
+        return new ServerImpl(serverIp, serverPort, userName, password);
+    }
+
+    public int getQueryHistoryLimit() {
+        return queryHistoryLimit;
+    }
+
+}

+ 361 - 0
loader/src/main/java/com/gyee/edge/loader/golden/config/GoldenConnectionPool.java

@@ -0,0 +1,361 @@
+package com.gyee.edge.loader.golden.config;
+
+import com.rtdb.service.impl.ServerImpl;
+import org.springframework.stereotype.Component;
+
+import java.util.Enumeration;
+import java.util.Vector;
+
+@Component
+public class GoldenConnectionPool {
+
+    private String serverIp = "127.0.0.1"; // 服务器IP
+    private short serverPort = 10010; // 服务器端口
+    private String userName = "sa";
+    private String password = "golden";
+    private int defaultConnections = 1; // 连接池的初始大小
+    private int incrementalConnections = 1;// 连接池自动增加的大小
+    private int maxConnections = 10; // 连接池最大的大小
+    private Vector connections = null; // 存放连接池中连接的向量 , 初始时为 null
+
+
+    public void  init(String serverIp, short serverPort, String userName, String password, int defaultConnections, int incrementalConnections, int maxConnections) {
+        this.serverIp = serverIp;
+        this.serverPort = serverPort;
+        this.userName = userName;
+        this.password = password;
+        this.defaultConnections = defaultConnections;
+        this.incrementalConnections = incrementalConnections;
+        this.maxConnections = maxConnections;
+    }
+
+    /**
+     * 创建一个连接池,连接池中的可用连接的数量采用类成员 defaultConnections 中设置的值
+     */
+    private synchronized void createPool() throws Exception {
+        // 确保连接池没有创建
+        // 如果连接池己经创建了,保存连接的向量 connections 不会为空
+        if (connections != null) {
+            return; // 如果己经创建,则返回
+        }
+        // 创建保存连接的向量 , 初始时有 0 个元素
+        connections = new Vector();
+        // 根据 defaultConnections 中设置的值,创建连接。
+        createConnections(this.defaultConnections);
+        System.out.println(" Golden连接池创建成功! ");
+    }
+
+    /**
+     * 创建由 numConnections 指定数目的Golden连接 , 并把这些连接
+     * 放入 connections 向量中
+     * @param numConnections  要创建的Golden连接的数目
+     */
+    private void createConnections(int numConnections) throws Exception {
+        // 循环创建指定数目的Golden连接
+        for (int x = 0; x < numConnections; x++) {
+            // 是否连接池中的Golden连接的数量己经达到最大?最大值由类成员 maxConnections
+            // 指出,如果 maxConnections 为 0 或负数,表示连接数量没有限制。
+            // 如果连接数己经达到最大,即退出。
+            if (this.maxConnections > 0  && this.connections.size() >= this.maxConnections) {
+                break;
+            }
+
+            // add a new PooledConnection object to connections vector
+            // 增加一个连接到连接池中(向量 connections 中)
+            try {
+                connections.addElement(new PooledConnection(newConnection()));
+
+            } catch (Exception e) {
+                System.out.println(" 创建Golden连接失败! " + e.getMessage());
+                throw new Exception();
+            }
+            System.out.println(" Golden连接己创建 ......");
+        }
+    }
+
+    /**
+     * 创建一个新的Golden连接并返回它
+     * @return 返回一个新创建的Golden连接
+     */
+    private ServerImpl newConnection() throws Exception {
+        // 创建一个Golden连接
+//        String file = RtdbServerImpl.class.getProtectionDomain().getCodeSource().getLocation().getFile();
+//        System.out.println("fffffffffffffffffffff = " + file);
+//        if (file.endsWith(".jar")) {
+//            System.out.println("对对对对对对对对对对对对对对");
+//        } else {
+//            System.out.println("错错错错错错错错错错错错错错");
+//        }
+
+        ServerImpl conn = new ServerImpl(serverIp, serverPort, userName, password);
+        return conn; // 返回创建的新的Golden连接
+    }
+
+    /**
+     * 通过调用 getFreeConnection() 函数返回一个可用的Golden连接 ,
+     * 如果当前没有可用的Golden连接,并且更多的Golden连接不能创建(如连接池大小的限制),此函数等待一会再尝试获取。
+     *
+     * @return 返回一个可用的Golden连接对象
+     */
+    public synchronized ServerImpl getConnection() throws Exception {
+        // 确保连接池己被创建
+        if (connections == null) {
+            createPool(); // 连接池还没创建,则返回 null
+        }
+
+        ServerImpl conn = getFreeConnection(); // 获得一个可用的Golden连接
+        // 如果目前没有可以使用的连接,即所有的连接都在使用中
+        while (conn == null) {
+            // 等一会再试
+            wait(250);
+            conn = getFreeConnection(); // 重新再试,直到获得可用的连接,如果
+            // getFreeConnection() 返回的为 null
+            // 则表明创建一批连接后也不可获得可用连接
+        }
+        return conn;// 返回获得的可用的连接
+    }
+
+    /**
+     * 本函数从连接池向量 connections 中返回一个可用的的Golden连接,如果
+     * 当前没有可用的Golden连接,本函数则根据 incrementalConnections 设置
+     * 的值创建几个Golden连接,并放入连接池中。
+     * 如果创建后,所有的连接仍都在使用中,则返回 null
+     * @return 返回一个可用的Golden连接
+     */
+    private ServerImpl getFreeConnection() throws Exception {
+        // 从连接池中获得一个可用的Golden连接
+        ServerImpl conn = findFreeConnection();
+        if (conn == null) {
+
+            // 如果目前连接池中没有可用的连接
+            // 创建一些连接
+            createConnections(incrementalConnections);
+            // 重新从池中查找是否有可用连接
+            conn = findFreeConnection();
+            if (conn == null) {
+                // 如果创建连接后仍获得不到可用的连接,则返回 null
+                return null;
+            }
+        }
+        return conn;
+    }
+
+    /**
+     * 查找连接池中所有的连接,查找一个可用的Golden连接,
+     * 如果没有可用的连接,返回 null
+     * @return 返回一个可用的Golden连接
+     */
+    private ServerImpl findFreeConnection() throws Exception {
+        ServerImpl conn = null;
+        PooledConnection pConn = null;
+        // 获得连接池向量中所有的对象
+        Enumeration enumerate = connections.elements();
+
+        // 遍历所有的对象,看是否有可用的连接
+        while (enumerate.hasMoreElements()) {
+            pConn = (PooledConnection) enumerate.nextElement();
+            if (!pConn.isBusy()) {
+                // 如果此对象不忙,则获得它的Golden连接并把它设为忙
+                conn = pConn.getConnection();
+                pConn.setBusy(true);
+                break; // 己经找到一个可用的连接,退出
+            }
+        }
+        return conn;// 返回找到到的可用连接
+    }
+
+
+    /**
+     * 此函数返回一个Golden连接到连接池中,并把此连接置为空闲。
+     * 所有使用连接池获得的Golden连接均应在不使用此连接时返回它。
+     */
+    public void returnConnection(ServerImpl conn) {
+        // 确保连接池存在,如果连接没有创建(不存在),直接返回
+        if (connections == null) {
+            System.out.println(" 连接池不存在,无法返回此连接到连接池中 !");
+            return;
+        }
+
+        PooledConnection pConn = null;
+        Enumeration enumerate = connections.elements();
+        // 遍历连接池中的所有连接,找到这个要返回的连接对象
+        while (enumerate.hasMoreElements()) {
+            pConn = (PooledConnection) enumerate.nextElement();
+            // 先找到连接池中的要返回的连接对象
+            if (conn == pConn.getConnection()) {
+                // 找到了 , 设置此连接为空闲状态
+                pConn.setBusy(false);
+                break;
+            }
+        }
+    }
+
+    /**
+     * 刷新连接池中所有的连接对象
+     */
+    public synchronized void refreshConnections() throws Exception {
+        // 确保连接池己创新存在
+        if (connections == null) {
+            System.out.println(" 连接池不存在,无法刷新 !");
+            return;
+        }
+
+        PooledConnection pConn = null;
+        Enumeration enumerate = connections.elements();
+        while (enumerate.hasMoreElements()) {
+
+            // 获得一个连接对象
+            pConn = (PooledConnection) enumerate.nextElement();
+            // 如果对象忙则等 5 秒 ,5 秒后直接刷新
+            if (pConn.isBusy()) {
+                wait(5000); // 等 5 秒
+            }
+            // 关闭此连接,用一个新的连接代替它。
+            closeConnection(pConn.getConnection());
+            pConn.setConnection(newConnection());
+            pConn.setBusy(false);
+        }
+    }
+
+    /**
+     * 关闭连接池中所有的连接,并清空连接池。
+     */
+    private synchronized void closeConnectionPool() throws Exception {
+        // 确保连接池存在,如果不存在,返回
+        if (connections == null) {
+            System.out.println(" 连接池不存在,无法关闭 !");
+            return;
+        }
+
+        PooledConnection pConn = null;
+        Enumeration enumerate = connections.elements();
+        while (enumerate.hasMoreElements()) {
+            pConn = (PooledConnection) enumerate.nextElement();
+            // 如果忙,等 5 秒
+            if (pConn.isBusy()) {
+                wait(5000); // 等 5 秒
+            }
+            // 5 秒后直接关闭它
+            closeConnection(pConn.getConnection());
+            // 从连接池向量中删除它
+            connections.removeElement(pConn);
+        }
+        // 置连接池为空
+        connections = null;
+    }
+
+    /**
+     * 关闭一个Golden连接
+     */
+    private void closeConnection(ServerImpl conn) {
+        try {
+            conn.close();
+        } catch (Exception e) {
+            System.out.println(" 关闭Golden连接出错: " + e.getMessage());
+        }
+    }
+
+    @Override
+    public void finalize() {
+        try {
+            closeConnectionPool();
+        } catch (Exception e) {
+            System.out.println(" 关闭Golden连接池出错: " + e.getMessage());
+        }
+    }
+
+
+    /**
+     * 使程序等待给定的毫秒数
+     */
+    private void wait(int mSeconds) {
+        try {
+            Thread.sleep(mSeconds);
+
+        } catch (InterruptedException e) {
+        }
+    }
+
+    /**
+     * 返回连接池的初始大小
+     *
+     * @return 初始连接池中可获得的连接数量
+     */
+    public int getDefaultConnections() {
+        return this.defaultConnections;
+    }
+
+    /**
+     * 设置连接池的初始大小
+     */
+    public void setDefaultConnections(int defaultConnections) {
+        this.defaultConnections = defaultConnections;
+    }
+
+    /**
+     * 返回连接池自动增加的大小 、
+     * @return 连接池自动增加的大小
+     */
+    public int getIncrementalConnections() {
+        return this.incrementalConnections;
+    }
+
+    /**
+     * 设置连接池自动增加的大小
+     */
+    public void setIncrementalConnections(int incrementalConnections) {
+        this.incrementalConnections = incrementalConnections;
+    }
+
+    /**
+     * 返回连接池中最大的可用连接数量
+     * @return 连接池中最大的可用连接数量
+     */
+    public int getMaxConnections() {
+        return this.maxConnections;
+    }
+
+    /**
+     * 设置连接池中最大可用的连接数量
+     */
+    public void setMaxConnections(int maxConnections) {
+        this.maxConnections = maxConnections;
+    }
+
+    /**
+     * 内部使用的用于保存连接池中连接对象的类
+     * 此类中有两个成员,一个是Golden的连接,另一个是指示此连接是否
+     * 正在使用的标志。
+     */
+    class PooledConnection {
+        ServerImpl connection = null;// Golden连接
+        boolean busy = false; // 此连接是否正在使用的标志,默认没有正在使用
+
+        // 构造函数,根据一个 Connection 构告一个 PooledConnection 对象
+        public PooledConnection(ServerImpl connection) {
+            this.connection = connection;
+        }
+
+        // 返回此对象中的连接
+        public ServerImpl getConnection() {
+            return connection;
+        }
+
+        // 设置此对象的,连接
+        public void setConnection(ServerImpl connection) {
+            this.connection = connection;
+        }
+
+        // 获得对象连接是否忙
+        public boolean isBusy() {
+            return busy;
+        }
+
+        // 设置对象的连接正在忙
+        public void setBusy(boolean busy) {
+            this.busy = busy;
+        }
+
+    }
+
+}

+ 2 - 1
loader/src/main/java/com/gyee/edge/loader/mapper/DBPointMapper.java

@@ -5,11 +5,12 @@ import lombok.Data;
 /**
 /**
  * @author :
  * @author :
  * @date :Created in 2022/8/25 17:34
  * @date :Created in 2022/8/25 17:34
- * @description:  gyfp2协议与写入数据配置实体
+ * @description:  gyfp2协议与写入数据配置类
  */
  */
 
 
 @Data
 @Data
 public class DBPointMapper {
 public class DBPointMapper {
+    private String gyfp2_id;
     private int gyfp2_address;
     private int gyfp2_address;
     private int gyfp2_valid;
     private int gyfp2_valid;
     private float gyfp2_coeff;
     private float gyfp2_coeff;

+ 6 - 9
loader/src/main/java/com/gyee/edge/loader/rocketmq/comuser/BalanceComuser.java

@@ -1,6 +1,5 @@
 package com.gyee.edge.loader.rocketmq.comuser;
 package com.gyee.edge.loader.rocketmq.comuser;
 
 
-import com.google.protobuf.InvalidProtocolBufferException;
 import com.gyee.edge.loader.adapter.DataWrite;
 import com.gyee.edge.loader.adapter.DataWrite;
 
 
 import com.gyee.protocol.gyfp2.message.GYMessage;
 import com.gyee.protocol.gyfp2.message.GYMessage;
@@ -15,7 +14,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
-import java.util.Date;
 import java.util.List;
 import java.util.List;
 
 
 @Slf4j
 @Slf4j
@@ -25,13 +23,16 @@ public class BalanceComuser {
     @Value("${rocketmq.namesrvaddr}")
     @Value("${rocketmq.namesrvaddr}")
     String nameSrvAddr;
     String nameSrvAddr;
 
 
+    @Value("${rocketmq.topic}")
+    String topic;
+
     @Autowired
     @Autowired
     private DataWrite dataWrite;
     private DataWrite dataWrite;
 
 
     public void DataConsumer() throws Exception {
     public void DataConsumer() throws Exception {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_sonsumer");
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_sonsumer");
         consumer.setNamesrvAddr(nameSrvAddr);
         consumer.setNamesrvAddr(nameSrvAddr);
-        consumer.subscribe("keyMessage", "*");
+        consumer.subscribe(topic, "*");
         consumer.setMessageModel(MessageModel.CLUSTERING);
         consumer.setMessageModel(MessageModel.CLUSTERING);
         consumer.registerMessageListener(new MessageListenerConcurrently() {
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             @Override
@@ -44,14 +45,10 @@ public class BalanceComuser {
                         try {
                         try {
                             //反序列化
                             //反序列化
                             byte[] bytes = msg.getBody();
                             byte[] bytes = msg.getBody();
-                            Date sTdate = new Date();
                             GYMessage gyMessage = GYMessage.toObject(bytes);
                             GYMessage gyMessage = GYMessage.toObject(bytes);
-                            Date eNdate = new Date();
-                            log.info("反序列化时间"+(eNdate.getTime()-sTdate.getTime()));
-                            log.info("收到消息" + "topic" + topic + ",tags" + tags + ",msg" + gyMessage);
                             //数据写入
                             //数据写入
-                            dataWrite.DataWriteGloden(gyMessage);
-                        } catch (InvalidProtocolBufferException e) {
+                            dataWrite.DataWriteGloden(gyMessage,topic+tags);
+                        } catch (Exception e) {
                             e.printStackTrace();
                             e.printStackTrace();
                         }
                         }
                     }
                     }

+ 8 - 4
loader/src/main/java/com/gyee/edge/loader/sqlite/ReadDBPointData.java

@@ -38,23 +38,26 @@ public class ReadDBPointData {
                 dbPointMapper.setPoint_ratio(rs.getInt("point_ratio"));
                 dbPointMapper.setPoint_ratio(rs.getInt("point_ratio"));
                 dbPointMappers.add(dbPointMapper);
                 dbPointMappers.add(dbPointMapper);
             }
             }
-            db.releaseResource();
         } catch (Exception e) {
         } catch (Exception e) {
             log.info("sqlite查询失败",e);
             log.info("sqlite查询失败",e);
             e.printStackTrace();
             e.printStackTrace();
+        }finally {
+            db.releaseResource();
         }
         }
         return dbPointMappers;
         return dbPointMappers;
     }
     }
 
 
-    public DBPointMapper selectBykey(int gyfp2_address) {
+    public DBPointMapper selectBykey(int gyfp2_address,String gyfp2_id) {
         DBPointMapper dbPointMapper = new DBPointMapper();
         DBPointMapper dbPointMapper = new DBPointMapper();
         try {
         try {
             Connection conn = db.getConnection();
             Connection conn = db.getConnection();
-            String sql = "select * from dbpointmapper where gyfp2_address = ?";
+            String sql = "select * from dbpointmapper where gyfp2_address = ? and gyfp2_id = ? ";
             PreparedStatement ps = conn.prepareStatement(sql);
             PreparedStatement ps = conn.prepareStatement(sql);
             ps.setInt(1,gyfp2_address);
             ps.setInt(1,gyfp2_address);
+            ps.setString(2,gyfp2_id);
             ResultSet rs = ps.executeQuery();
             ResultSet rs = ps.executeQuery();
             while (rs.next()) {
             while (rs.next()) {
+                dbPointMapper.setGyfp2_id(rs.getString("gyfp2_id"));
                 dbPointMapper.setGyfp2_address(rs.getInt("gyfp2_address"));
                 dbPointMapper.setGyfp2_address(rs.getInt("gyfp2_address"));
                 dbPointMapper.setGyfp2_valid(rs.getInt("gyfp2_valid"));
                 dbPointMapper.setGyfp2_valid(rs.getInt("gyfp2_valid"));
                 dbPointMapper.setGyfp2_coeff(rs.getFloat("gyfp2_coeff"));
                 dbPointMapper.setGyfp2_coeff(rs.getFloat("gyfp2_coeff"));
@@ -64,10 +67,11 @@ public class ReadDBPointData {
                 dbPointMapper.setPoint_key(rs.getString("point_key"));
                 dbPointMapper.setPoint_key(rs.getString("point_key"));
                 dbPointMapper.setPoint_ratio(rs.getInt("point_ratio"));
                 dbPointMapper.setPoint_ratio(rs.getInt("point_ratio"));
             }
             }
-            db.releaseResource();
         } catch (Exception e) {
         } catch (Exception e) {
             log.info("sqlite查询失败",e);
             log.info("sqlite查询失败",e);
             e.printStackTrace();
             e.printStackTrace();
+        }finally {
+            db.releaseResource();
         }
         }
         return dbPointMapper;
         return dbPointMapper;
     }
     }

+ 63 - 0
loader/src/main/java/com/gyee/edge/loader/sqlite/cache/CacheService.java

@@ -0,0 +1,63 @@
+package com.gyee.edge.loader.sqlite.cache;
+
+
+import com.gyee.edge.loader.mapper.DBPointMapper;
+import com.gyee.edge.loader.sqlite.config.Database;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+@Component
+public class CacheService {
+
+    @Autowired
+    private Database db;
+
+
+    private Map<String,DBPointMapper> map = new HashMap<>();
+
+    public Map<String,DBPointMapper> getMap(){
+        if (map.isEmpty()){
+            map = loadDBPointMappers();
+        }
+
+        return map;
+    }
+
+    private Map<String,DBPointMapper> loadDBPointMappers(){
+        Map<String,DBPointMapper> map = new HashMap<>();
+        try {
+            Connection conn = db.getConnection();
+            String sql = "select * from dbpointmapper";
+            PreparedStatement ps = conn.prepareStatement(sql);
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                DBPointMapper dbPointMapper = new DBPointMapper();
+                dbPointMapper.setGyfp2_id(rs.getString("gyfp2_id"));
+                dbPointMapper.setGyfp2_address(rs.getInt("gyfp2_address"));
+                dbPointMapper.setGyfp2_valid(rs.getInt("gyfp2_valid"));
+                dbPointMapper.setGyfp2_coeff(rs.getFloat("gyfp2_coeff"));
+                dbPointMapper.setGyfp2_base(rs.getFloat("gyfp2_base"));
+                dbPointMapper.setPoint_id(rs.getInt("point_id"));
+                dbPointMapper.setPoint_type(rs.getInt("point_type"));
+                dbPointMapper.setPoint_key(rs.getString("point_key"));
+                dbPointMapper.setPoint_ratio(rs.getInt("point_ratio"));
+                map.put(dbPointMapper.getPoint_key(),dbPointMapper);
+            }
+        } catch (Exception e) {
+            log.info("sqlite查询失败",e);
+            e.printStackTrace();
+        }finally {
+            db.releaseResource();
+        }
+        return map;
+    }
+
+}

+ 0 - 364
loader/src/main/java/com/gyee/edge/loader/sqlite/config/DB.java

@@ -1,364 +0,0 @@
-package com.gyee.edge.loader.sqlite.config;
-
-import java.io.Reader;
-import java.io.StringReader;
-import java.sql.*;
-import java.text.MessageFormat;
-import java.util.*;
-
-public class DB {
-    //数据库驱动,默认为Oracle
-    public static String JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver";
-
-    //数据库连接地址
-    public static String DB_URL = "";
-
-    public static String USER = "";
-
-    public static String PASS = "";
-
-    Connection CONN = null;
-
-    //PreparedStatement PSTM = null;
-
-    public DB(String url, String user, String pass) {
-        DB_URL = url;
-        USER = user;
-        PASS = pass;
-    }
-
-    public DB() {
-        /*Properties props = new Properties();
-        try {
-            String configFilePath = System.getProperty("antgis.web.oa.root")+"/WEB-INF/config.properties";
-            props.load(new FileInputStream(configFilePath));
-        }catch (FileNotFoundException e){
-
-        }catch (IOException e){
-
-        }*/
-
-
-//        JDBC_DRIVER = Config.DB_DRIVER;
-//        DB_URL = Config.DB_URL;
-//        USER = Config.DB_USER;
-//        PASS = Config.DB_PWD;
-    }
-
-    public Connection getConnection() {
-        try {
-            Class.forName(JDBC_DRIVER);
-            CONN = DriverManager.getConnection(DB_URL, USER, PASS);
-        } catch (ClassNotFoundException e) {
-
-        } catch (SQLException e) {
-
-        }
-        return CONN;
-    }
-
-    public ResultSet getResultSet(String sql) {
-        ResultSet resultSet = null;
-        //PreparedStatement pstm = null;
-        try {
-            CONN = getConnection();
-            PreparedStatement PSTM = CONN.prepareStatement(sql);
-            resultSet = PSTM.executeQuery();
-            return resultSet;
-        } catch (SQLException e) {
-            return resultSet;
-        } finally {
-
-        }
-    }
-
-    public void close() {
-        /*if(PSTM !=null) {
-            try {
-                PSTM.close();
-            } catch (SQLException e) {
-                e.printStackTrace();
-            }
-        }*/
-
-        try {
-            if (CONN != null && (!CONN.isClosed())) {
-                try {
-                    CONN.close();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                }
-            }
-        } catch (SQLException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public List<Map<String, Object>> getList(String sql) {
-        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
-        ResultSet rs = getResultSet(sql);
-        list = convertToList(rs);
-        return list;
-    }
-
-    public List<Map<String, Object>> convertToList(ResultSet rs) {
-        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
-        try {
-            ResultSetMetaData md = rs.getMetaData(); //获得结果集结构信息,元数据
-            int columnCount = md.getColumnCount();   //获得列数
-            while (rs.next()) {
-                Map<String, Object> rowData = new HashMap<String, Object>();
-                for (int i = 1; i <= columnCount; i++) {
-                    rowData.put(md.getColumnName(i), rs.getObject(i));
-                }
-                list.add(rowData);
-            }
-        } catch (SQLException e) {
-
-        }
-
-        return list;
-    }
-
-    public ResultSet getResultSet(String tableName, String filter) {
-        String sql = MessageFormat.format("select * from {0} where {1}", tableName, filter);
-        return getResultSet(sql);
-    }
-
-    public ResultSet getResultSet(String tableName, String columnList, String filter) {
-        String sql = MessageFormat.format("select {2} from {0} where {1}", tableName, filter, columnList);
-        return getResultSet(sql);
-    }
-
-    public ResultSet getResultSet(String tableName, String columnList, String filter, String orderColumnList) {
-        String sql = MessageFormat.format("select {2} from {0} where {1} order by {3}", tableName, filter, columnList, orderColumnList);
-        return getResultSet(sql);
-    }
-
-    public Object getFirstColumn(String sql) {
-        ResultSet rs = getResultSet(sql);
-        try {
-            if (rs.next()) {
-                return rs.getObject(0);
-            }
-        } catch (SQLException e) {
-
-        }
-        return null;
-    }
-
-    public String getSqlForPageSize(String tableName, Integer pageSize, Integer pageIndex, String where, String order) {
-        return getSqlForPageSize(tableName, "*", pageSize, pageIndex, where, order);
-    }
-
-    public String getSqlForPageSize(String tableName, String sColumnList, int pageSize, int pageIndex, String where, String order) {
-        String sSql = "";
-        StringBuilder sb = new StringBuilder();
-        sb.append("select " + sColumnList + " from (select t.*,rownum rno ");
-        sb.append(" from (select * from " + tableName + " ");
-        sb.append(" where " + where);
-        sb.append(order);
-        sb.append(") t where rownum <= ");
-        sb.append((pageIndex + 1) * pageSize);
-        sb.append(" )");
-        sb.append(" where rno >= ");
-        sb.append(pageIndex * pageSize + 1);
-        sSql = sb.toString();
-        return sSql;
-    }
-
-    public boolean judgeRecordExist(String tableName, String filter) {
-        ResultSet rs = getResultSet(tableName, filter);
-        try {
-            if (rs.next())
-                return true;
-            else
-                return false;
-        } catch (SQLException e) {
-            return false;
-        }
-    }
-
-    public String formatTableName(String tableName) {
-        return tableName.replace("\"", "");
-    }
-
-    public boolean judgeTableOrViewExist(String tableName) {
-        String sql = "select count(*) from user_objects where object_type in ('TABLE','VIEW') AND upper(OBJECT_NAME)=upper('" + formatTableName(tableName) + "')";
-        ResultSet rs = getResultSet(sql);
-        return judgeCountValue(rs);
-    }
-
-    public boolean judgeColumnExist(String tableName, String columnName) {
-        String sql = "select count(*) from user_tab_cols where upper(table_name)=upper('" + formatTableName(tableName) + "') and upper(COLUMN_NAME)=upper('" + columnName + "') order by COLUMN_ID";
-        ResultSet rs = getResultSet(sql);
-        return judgeCountValue(rs);
-    }
-
-    private boolean judgeCountValue(ResultSet rs) {
-        try {
-            if (rs.next()) {
-                if (Integer.parseInt(rs.getObject(0).toString()) == 1)
-                    return true;
-                else
-                    return false;
-            } else {
-                return false;
-            }
-        } catch (SQLException e) {
-            return false;
-        }
-    }
-
-    public HashMap getTableColumnType(String tableName) {
-        String sql = "select COLUMN_NAME,DATA_TYPE,DATA_LENGTH,COLUMN_ID from user_tab_cols where upper(table_name)=upper('" + formatTableName(tableName) + "') order by COLUMN_ID";
-        ResultSet rs = getResultSet(sql);
-        HashMap hm = new HashMap();
-        try {
-            while (rs.next()) {
-                String name = rs.getString("COLUMN_NAME");
-                String value = rs.getString("DATA_TYPE");
-                hm.put(name, value);
-            }
-        } catch (SQLException e) {
-
-        }
-
-        return hm;
-    }
-
-    public boolean add(String tableName, LinkedHashMap<String, Object> hm) throws Exception {
-        if (tableName == null || tableName.equals("")) {
-            throw new Exception("表名不能为空字符串!");
-        }
-
-        String sColumns = "";
-        String sValues = "";
-
-        Iterator iter = hm.entrySet().iterator();
-        while (iter.hasNext()) {
-            Map.Entry entry = (Map.Entry) iter.next();
-            sColumns += entry.getKey().toString() + ",";
-            sValues += "?,";
-        }
-        /*for(String key:hm.keySet()){
-            sColumns += key +",";
-            sValues += "?,";
-        }*/
-
-        sColumns = sColumns.substring(0, sColumns.length() - 1);
-        sValues = sValues.substring(0, sValues.length() - 1);
-        String sql = MessageFormat.format("insert into {0} ({1}) values({2})", tableName, sColumns, sValues);
-
-        return executeSql(sql, tableName, hm);
-    }
-
-    public boolean update(String tableName, LinkedHashMap<String, Object> hm, String filterColumns) throws Exception {
-        if (tableName == null || tableName.equals("")) {
-            throw new Exception("表名不能为空字符串!");
-        }
-
-        String filter = getFilterString(tableName, hm, filterColumns);
-
-        return update2(tableName, hm, filter);
-    }
-
-    public boolean update2(String tableName, LinkedHashMap<String, Object> hm, String filter) throws Exception {
-        if (tableName == null || tableName.equals("")) {
-            throw new Exception("表名不能为空字符串!");
-        }
-
-        String str = "";
-        Iterator iter = hm.entrySet().iterator();
-        while (iter.hasNext()) {
-            Map.Entry entry = (Map.Entry) iter.next();
-            str += entry.getKey().toString() + "=?,";
-        }
-
-        str = str.substring(0, str.length() - 1);
-        String sql = MessageFormat.format("update {0} set {1} where {2}", tableName, str, filter);
-        return executeSql(sql, tableName, hm);
-    }
-
-    private String getFilterString(String tableName, LinkedHashMap<String, Object> hm, String filterColumns) {
-        String[] arr = filterColumns.split(",");
-        int length = arr.length;
-        HashMap columnsType = getTableColumnType(tableName);
-        String filter = "";
-        for (int ii = 0; ii < length; ii++) {
-            String columnType = columnsType.get(arr[ii]).toString();
-            switch (columnType) {
-                case "NUMBER":
-                    filter += "and " + arr[ii] + "=" + hm.get(arr[ii]) + " ";
-                    break;
-                case "DATE":
-                    filter += "and " + arr[ii] + "=to_date('" + hm.get(arr[ii]).toString() + "','yyyy-mm-dd hh24:mi:ss') ";
-                    break;
-                default:
-                    filter += "and " + arr[ii] + "='" + hm.get(arr[ii]).toString() + "' ";
-                    break;
-
-            }
-        }
-        filter = filter.substring(3);
-        return filter;
-    }
-
-
-    public boolean executeSql(String sql, String tableName, LinkedHashMap<String, Object> hm) {
-        boolean bl = false;
-        int i = 0;
-        try {
-            CONN = getConnection();
-            PreparedStatement pstm = CONN.prepareStatement(sql);
-            HashMap columnsType = getTableColumnType(tableName);
-
-            Iterator iter = hm.entrySet().iterator();
-            int index = 0;
-            while (iter.hasNext()) {
-                index += 1;
-                Map.Entry entry = (Map.Entry) iter.next();
-                String key = entry.getKey().toString();
-                Object val = entry.getValue();
-                String columnType = columnsType.get(key).toString();
-                switch (columnType) {
-                    case "NVARCHAR2":
-                    case "NCHAR":
-                    case "VARCHAR2":
-                    case "CHAR":
-                    case "NUMBER":
-                        pstm.setString(index, val.toString());
-                        break;
-                    case "DATE":
-                        Timestamp dateTime = Timestamp.valueOf(val.toString());
-                        pstm.setTimestamp(index, dateTime);
-                        break;
-                    /*case "BLOB":
-                        PSTM.setBlob(index,oracle.sql.BLOB.getEmptyBLOB());
-
-                        break;*/
-                    case "CLOB":
-                        String str = val.toString();
-                        Reader clobReader = new StringReader(str); // 将 text转成流形式
-                        pstm.setClob(index, clobReader, str.length());
-                        break;
-                    default:
-                        pstm.setString(index, val.toString());
-                        break;
-
-                }
-            }
-
-            i = pstm.executeUpdate();
-            if (i > 0)
-                bl = true;
-
-
-        } catch (SQLException e) {
-            String msg = e.getMessage();
-        }
-
-
-        return bl;
-    }
-}

+ 0 - 17
loader/src/main/java/com/gyee/edge/loader/sqlite/config/Database.java

@@ -16,9 +16,6 @@ public class Database {
 
 
     Connection connection = null;
     Connection connection = null;
 
 
-    PreparedStatement preparedStatement = null;
-    ResultSet rs = null;
-
 
 
     public Connection getConnection() throws Exception {
     public Connection getConnection() throws Exception {
         try{
         try{
@@ -35,21 +32,7 @@ public class Database {
 
 
     //释放资源
     //释放资源
     public void releaseResource(){
     public void releaseResource(){
-        if(rs!=null){
-            try{
-                rs.close();
-            }catch (SQLException e){
-                e.printStackTrace();
-            }
-        }
 
 
-        if(preparedStatement!=null){
-            try{
-                preparedStatement.close();
-            }catch (SQLException e){
-                e.printStackTrace();
-            }
-        }
 
 
         if(connection!=null){
         if(connection!=null){
             try{
             try{

BIN
loader/src/main/lib/commons-beanutils-1.8.3.jar


BIN
loader/src/main/lib/commons-logging-1.1.1.jar


BIN
loader/src/main/lib/golden-java-sdk-3.0.27.jar


BIN
loader/src/main/lib/protobuf-java-2.6.1.jar


+ 12 - 0
loader/src/main/resources/application.yaml

@@ -10,7 +10,19 @@ spring:
     username:
     username:
     password:
     password:
 
 
+golden:
+  server_ip: 10.155.32.1
+  #server_ip: 172.168.1.3
+  server_port: 6327
+  user_name: sa
+  password: golden
+  pool_size: 10
+  max_pool_size: 100
+  #单次查询历史原始数据的数量上限
+  query_history_limit: 100000
+
 rocketmq:
 rocketmq:
   namesrvaddr: 127.0.0.1:9876
   namesrvaddr: 127.0.0.1:9876
+  topic: UPWARD_MHS_
 
 
 adapterUrl: http://127.0.0.1:8011/ts
 adapterUrl: http://127.0.0.1:8011/ts

BIN
loader/src/main/resources/dbpointmapper.sqlite3


+ 8 - 2
protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/GYMessage.java

@@ -47,9 +47,15 @@ public class GYMessage implements Serializable {
              buffer.writeBytes(getMessageMutable().toBytes(getAttribute()));
              buffer.writeBytes(getMessageMutable().toBytes(getAttribute()));
          }
          }
          for (MessageData messageData : getMessageDataList()){
          for (MessageData messageData : getMessageDataList()){
-             byte[] bytes = messageData.toBytes(getAttribute(),getMessageMutable().getMessageType());
+             if (getAttribute() == 2 && getAttribute() == 3 && getAttribute() == 6 &&getAttribute() == 7){
+                 byte[] bytes = messageData.toBytes(getAttribute(),getMessageMutable().getMessageType());
+                 buffer.writeBytes(bytes);
+             }
+             else {
+                 byte[] bytes = messageData.toBytes(getAttribute(),messageData.getDataType());
+                 buffer.writeBytes(bytes);
+             }
              sum++;
              sum++;
-             buffer.writeBytes(bytes);
          }
          }
          //替换长度和数据项字节码
          //替换长度和数据项字节码
          ByteBufUtil.setShortBE(buffer,getHeader().length,ByteBufUtil.getBytes(buffer).length);
          ByteBufUtil.setShortBE(buffer,getHeader().length,ByteBufUtil.getBytes(buffer).length);

+ 6 - 5
protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/MessageData.java

@@ -75,7 +75,8 @@ public class MessageData implements Serializable {
         return bytes;
         return bytes;
     }
     }
 
 
-    /** MessageData对象转字节数组   messagetype决定实际长度类型
+    /**
+     *  MessageData对象转字节数组   messagetype决定实际长度类型
         mutable中有值则采用mutable中type的值,
         mutable中有值则采用mutable中type的值,
        mutable中无值则采用messagedata中的datatype。
        mutable中无值则采用messagedata中的datatype。
      */
      */
@@ -126,7 +127,7 @@ public class MessageData implements Serializable {
             for (int i = 0; i < bytes1.length;) {
             for (int i = 0; i < bytes1.length;) {
                 MessageData messageData = new MessageData();
                 MessageData messageData = new MessageData();
                 messageData.setDataType(bytes1[i]);
                 messageData.setDataType(bytes1[i]);
-                messageData.setDataKey(BytesUtils.bytesToInt(bytes1,i+1));
+                messageData.setDataKey(BytesUtils.bytesToInt(bytes1,i+1,3));
                 messageData.setTs(BytesUtils.getLong2(bytes1,i+4));
                 messageData.setTs(BytesUtils.getLong2(bytes1,i+4));
                 if (bytes1[i]==0){
                 if (bytes1[i]==0){
                     messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+12));
                     messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+12));
@@ -173,7 +174,7 @@ public class MessageData implements Serializable {
             System.arraycopy(bytes, 16 + bytes2.length, bytes1, 0, bytes1.length);
             System.arraycopy(bytes, 16 + bytes2.length, bytes1, 0, bytes1.length);
             for (int i = 0; i < bytes1.length;) {
             for (int i = 0; i < bytes1.length;) {
                 MessageData messageData = new MessageData();
                 MessageData messageData = new MessageData();
-                messageData.setDataKey(BytesUtils.bytesToInt(bytes1,i));
+                messageData.setDataKey(BytesUtils.bytesToInt(bytes1,i,3));
                 messageData.setTs(BytesUtils.getLong2(bytes1,i+3));
                 messageData.setTs(BytesUtils.getLong2(bytes1,i+3));
                 if (bytes2[0]==0){
                 if (bytes2[0]==0){
                     messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+11));
                     messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+11));
@@ -220,7 +221,7 @@ public class MessageData implements Serializable {
             for (int i = 0; i < bytes1.length;) {
             for (int i = 0; i < bytes1.length;) {
                 MessageData messageData = new MessageData();
                 MessageData messageData = new MessageData();
                 messageData.setDataType(bytes1[i]);
                 messageData.setDataType(bytes1[i]);
-                messageData.setDataKey(BytesUtils.bytesToInt(bytes1,i+1));
+                messageData.setDataKey(BytesUtils.bytesToInt(bytes1,i+1,3));
                 if (bytes1[i]==0){
                 if (bytes1[i]==0){
                     messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+4));
                     messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+4));
                     i = i + 5;
                     i = i + 5;
@@ -265,7 +266,7 @@ public class MessageData implements Serializable {
             System.arraycopy(bytes, 16 + bytes2.length, bytes1, 0, bytes1.length);
             System.arraycopy(bytes, 16 + bytes2.length, bytes1, 0, bytes1.length);
             for (int i = 0; i < bytes1.length;) {
             for (int i = 0; i < bytes1.length;) {
                 MessageData messageData = new MessageData();
                 MessageData messageData = new MessageData();
-                messageData.setDataKey(BytesUtils.bytesToInt(bytes1,i));
+                messageData.setDataKey(BytesUtils.bytesToInt(bytes1,i,3));
                 if (bytes2[0]==0){
                 if (bytes2[0]==0){
                     messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+3));
                     messageData.setGYFP2_TYPE_BOOLEAN(BytesUtils.getBoolean(bytes1,i+3));
                     i = i + 4;
                     i = i + 4;

+ 4 - 4
protocol/gyfp2/src/main/java/com/gyee/protocol/gyfp2/message/MessageMutable.java

@@ -53,7 +53,7 @@ public class MessageMutable implements Serializable {
                 //获取header头部字节大小
                 //获取header头部字节大小
                 byte[] bytes1 = new byte[3];
                 byte[] bytes1 = new byte[3];
                 System.arraycopy(bytes, 16, bytes1, 0, 3);
                 System.arraycopy(bytes, 16, bytes1, 0, 3);
-                int i = BytesUtils.bytesToInt(bytes1, 0);
+                int i = BytesUtils.bytesToInt(bytes1, 0,3);
                 messageMutable.setKey(i);
                 messageMutable.setKey(i);
             }else if (attribute == 2) {
             }else if (attribute == 2) {
                 byte[] bytes1 = new byte[1];
                 byte[] bytes1 = new byte[1];
@@ -63,7 +63,7 @@ public class MessageMutable implements Serializable {
                 byte[] bytes1 = new byte[4];
                 byte[] bytes1 = new byte[4];
                 System.arraycopy(bytes, 16, bytes1, 0, 4);
                 System.arraycopy(bytes, 16, bytes1, 0, 4);
                 messageMutable.setMessageType(bytes1[0]);
                 messageMutable.setMessageType(bytes1[0]);
-                int i = BytesUtils.bytesToInt(bytes1, 1);
+                int i = BytesUtils.bytesToInt(bytes1, 1,3);
                 messageMutable.setKey(i);
                 messageMutable.setKey(i);
             }else if (attribute == 4) {
             }else if (attribute == 4) {
                 byte[] bytes1 = new byte[8];
                 byte[] bytes1 = new byte[8];
@@ -72,7 +72,7 @@ public class MessageMutable implements Serializable {
             }else if (attribute == 5) {
             }else if (attribute == 5) {
                 byte[] bytes1 = new byte[11];
                 byte[] bytes1 = new byte[11];
                 System.arraycopy(bytes, 16, bytes1, 0, 11);
                 System.arraycopy(bytes, 16, bytes1, 0, 11);
-                int i = BytesUtils.bytesToInt(bytes1, 0);
+                int i = BytesUtils.bytesToInt(bytes1, 0,3);
                 messageMutable.setKey(i);
                 messageMutable.setKey(i);
                 messageMutable.setTs(BytesUtils.getLong2(bytes1, 3));
                 messageMutable.setTs(BytesUtils.getLong2(bytes1, 3));
             }else if (attribute == 6) {
             }else if (attribute == 6) {
@@ -84,7 +84,7 @@ public class MessageMutable implements Serializable {
                 byte[] bytes1 = new byte[12];
                 byte[] bytes1 = new byte[12];
                 System.arraycopy(bytes, 16, bytes1, 0, 12);
                 System.arraycopy(bytes, 16, bytes1, 0, 12);
                 messageMutable.setMessageType(bytes1[0]);
                 messageMutable.setMessageType(bytes1[0]);
-                int i = BytesUtils.bytesToInt(bytes1, 1);
+                int i = BytesUtils.bytesToInt(bytes1, 1,3);
                 messageMutable.setKey(i);
                 messageMutable.setKey(i);
                 messageMutable.setTs(BytesUtils.getLong2(bytes1, 4));
                 messageMutable.setTs(BytesUtils.getLong2(bytes1, 4));
             }
             }