Bladeren bron

调试数据封装程序

xieshengjie 4 jaren geleden
bovenliggende
commit
c216155bc9

+ 17 - 12
golden-realtime-kafka/src/main/java/com/gyee/wisdom/CalculateServer.java

@@ -10,9 +10,6 @@ import com.gyee.ygys.protocol.BitMapMessage;
 import com.gyee.ygys.utils.BytesUtil;
 import com.rtdb.api.callbackInter.RSDataChangeEx;
 import com.rtdb.api.model.RtdbData;
-import com.rtdb.api.util.DateUtil;
-import com.rtdb.enums.DataSort;
-import com.rtdb.model.SearchCondition;
 import com.rtdb.service.impl.ServerImpl;
 import com.rtdb.service.impl.SnapshotImpl;
 import lombok.extern.slf4j.Slf4j;
@@ -21,8 +18,8 @@ import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
+import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -54,7 +51,7 @@ public class CalculateServer {
     private HashMap<Integer, TagPoint> tagPointMap;
 
     @Autowired
-    private KafkaTemplate<String, BitMapMessage> messageKafkaTemplate;
+    private KafkaTemplate<String, byte[]> messageKafkaTemplate;
 
     public boolean start() {
         if (serverStarted) {
@@ -134,9 +131,17 @@ public class CalculateServer {
 
     @Async
     void publishRstdData(RtdbData[] goldenDatas) {
+
+
         BitMapMessage msg = createBitMapMessage(goldenDatas);
+
         //todo: 发送到kafka
-        messageKafkaTemplate.send("NSSFJ", msg);
+        try {
+            //byte[] bytes = msg.toBytes();
+            messageKafkaTemplate.send("NSSFJ", msg.toBytes());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
 
     }
 
@@ -161,10 +166,10 @@ public class CalculateServer {
             if (tpd.getTagPoint().getGroupIndex() != groupIndex) {
                 if (groupIndex != -1) {
                     //重置datasize
-                    byte[] tmp = new byte[dataOffset+1];
-                    System.arraycopy(bitMapGroup.getData(),0, tmp,0, dataOffset+1);
+                    byte[] tmp = new byte[dataOffset];
+                    System.arraycopy(bitMapGroup.getData(),0, tmp,0, dataOffset);
                     bitMapGroup.setData(tmp);
-                    bitMapGroup.setGroupLength(dataOffset+bitMapGroup.getBitMapLength() + 5);
+                    bitMapGroup.setGroupLength(dataOffset+bitMapGroup.getBitMapLength() + 10);
                 }
 
                 //创建新的BitMapGroup
@@ -255,10 +260,10 @@ public class CalculateServer {
         }
 
         //最后一个BitMapGroup修改data
-        byte[] tmp = new byte[dataOffset+1];
-        System.arraycopy(bitMapGroup.getData(),0, tmp,0, dataOffset+1);
+        byte[] tmp = new byte[dataOffset];
+        System.arraycopy(bitMapGroup.getData(),0, tmp,0, dataOffset);
         bitMapGroup.setData(tmp);
-        bitMapGroup.setGroupLength(dataOffset+bitMapGroup.getBitMapLength() + 5);
+        bitMapGroup.setGroupLength(dataOffset+bitMapGroup.getBitMapLength() + 10);
 
         return msg;
     }

+ 12 - 0
golden-realtime-kafka/src/main/java/com/gyee/wisdom/service/CacheService.java

@@ -13,6 +13,7 @@ import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Service
@@ -75,4 +76,15 @@ public class CacheService {
         return  new ArrayList<>();
     }
 
+    public TagPoint getPoint(short groupIndex, int pointIndex) {
+        List<TagPoint> tps = getPointTags();
+
+        List<TagPoint> tags = tps.stream().filter(tp -> tp.getGroupIndex() == groupIndex && tp.getPointIndex() == pointIndex)
+                .collect(Collectors.toList());
+        if (tags !=null && tags.size()>0){
+            return tags.get(0);
+        }
+        return null;
+    }
+
 }