xieshengjie 4 years ago
parent
commit
6d64212356

+ 5 - 8
kafka-consumer/src/main/java/com/gyee/wisdom/Bootstrap.java

@@ -18,14 +18,11 @@ public class Bootstrap {
     }
 
 
-    @KafkaListener(id = "group1", topics = "NSSFJ")
-    public void listen(Object tsDatas) {
-//        logger.info("Received: " + foo);
-//        if (foo.getFoo().startsWith("fail")) {
-//            throw new RuntimeException("failed");
-//        }
-//        this.exec.execute(() -> System.out.println("Hit Enter to terminate..."));
-    }
+//    @KafkaListener(id = "group1", topics = "NSSFJ")
+//    public void listen(Object tsDatas) {
+//        System.out.println(tsDatas);
+//    }
+
 
 }
 

+ 79 - 0
kafka-consumer/src/main/java/com/gyee/wisdom/KafkaConsumer.java

@@ -0,0 +1,79 @@
+package com.gyee.wisdom;
+
+import com.gyee.wisdom.model.TagPoint;
+import com.gyee.wisdom.service.CacheService;
+import com.gyee.ygys.protocol.BitMapGroup;
+import com.gyee.ygys.protocol.BitMapMessage;
+import com.gyee.ygys.protocol.BitMapMessageParser;
+import com.gyee.ygys.utils.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.PartitionOffset;
+import org.springframework.kafka.annotation.TopicPartition;
+import org.springframework.stereotype.Component;
+
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author xiesj
+ * @version 1.0
+ * @date 1
+ */
+@Component
+public class KafkaConsumer {
+
+    @Autowired
+    private CacheService cacheService;
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
+
+    @KafkaListener(groupId = "group1",topicPartitions = {@TopicPartition(topic="NSSFJ",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "1"))})
+    public void receive(byte[] message){
+        LOG.info("KafkaMessageConsumer 接收到消息:"+message);
+        Map<TagPoint, Double> tagPointDoubleMap = parseByteArrayFromKafka(message);
+    }
+
+
+    public Map<TagPoint,Double> parseByteArrayFromKafka(byte[] message){
+        Map<TagPoint,Double> valueMap = new HashMap<>();
+        BitMapMessage bitMapMessage = BitMapMessageParser.parserBitMapMessage(message);
+        BitSet bitMapL1 = bitMapMessage.getBitMapL1();
+        for(int i=0;i<bitMapL1.size();i++){
+            if(bitMapL1.get(i)){
+                int groupIndex = i;
+                List<BitMapGroup> filterGroups = bitMapMessage.getGroups().stream().filter(bitMapGroup -> bitMapGroup.getGroupIndex() == groupIndex).collect(Collectors.toList());
+                for (int j=0;j<filterGroups.size();j++){
+                    BitMapGroup bitMapGroup = filterGroups.get(j);
+                    BitSet bitMapL2 = bitMapGroup.getBitMapL2();
+                    int dataOffset = 0;
+                    byte[] data = bitMapGroup.getData();
+                    for(int x=0;x<bitMapL2.size();x++){
+                        if (bitMapL2.get(x)){
+                            TagPoint tp = cacheService.getPoint(bitMapGroup.getGroupIndex(), x);//x代表点的索引
+                            byte[] valueArray = new byte[8];
+                            if (tp.getDataType().equals("BOOL")){
+                                System.arraycopy(data,dataOffset,valueArray,0,1);
+                                dataOffset+=1;
+                            }else{
+                                System.arraycopy(data,dataOffset,valueArray,0,8);
+                                dataOffset+=8;
+                            }
+                            double value = BytesUtil.byte2Double(valueArray);
+                            valueMap.put(tp,value);
+                        }
+                    }
+
+                }
+            }
+        }
+        return valueMap;
+    }
+
+}

+ 5 - 7
kafka-consumer/src/main/java/com/gyee/wisdom/model/TagPointData.java

@@ -1,14 +1,12 @@
 package com.gyee.wisdom.model;
 
-import com.opencsv.bean.CsvBindByPosition;
-import com.rtdb.api.model.RtdbData;
 import lombok.Data;
 
 @Data
 public class TagPointData implements Comparable {
 
     private TagPoint tagPoint;
-    private RtdbData rtdbData;
+//    private RtdbData rtdbData;
 
     @Override
     public int compareTo(Object newData) {
@@ -20,9 +18,9 @@ public class TagPointData implements Comparable {
         return result;
     }
 
-    public  TagPointData(TagPoint tp, RtdbData rd) {
-        tagPoint = tp;
-        rtdbData = rd;
-    }
+//    public  TagPointData(TagPoint tp, RtdbData rd) {
+//        tagPoint = tp;
+//        rtdbData = rd;
+//    }
 
 }

+ 10 - 0
kafka-consumer/src/main/java/com/gyee/wisdom/service/CacheService.java

@@ -13,6 +13,7 @@ import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Service
@@ -74,5 +75,14 @@ public class CacheService {
 
         return  new ArrayList<>();
     }
+    public TagPoint getPoint(short groupIndex, int pointIndex) {
+        List<TagPoint> tps = getPointTags();
 
+        List<TagPoint> tags = tps.stream().filter(tp -> tp.getGroupIndex() == groupIndex && tp.getPointIndex() == pointIndex)
+                .collect(Collectors.toList());
+        if (tags !=null && tags.size()>0){
+            return tags.get(0);
+        }
+        return null;
+    }
 }

+ 17 - 6
kafka-consumer/src/main/resources/application.yaml

@@ -5,12 +5,23 @@ spring:
   application:
     name: ygys-golden-latest
   kafka:
-    bootstrap-servers: 172.168.5.62:9092
-    kafka.topic-name: myTopic
-    kafka.consumer.group.id: test-consumer-group
-    producer:
-      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
-
+    consumer:
+      bootstrap-servers: 172.168.5.62:9092
+      #默认消费者组
+      group-id: group1
+      #最早未被消费的offset
+      auto-offset-reset: earliest
+      #批量一次最大拉取数量
+      max-poll-records: 1000
+      #自动提交
+      auto-commit-interval: 1000
+      enable-auto-commit: true
 
+#    bootstrap-servers: 172.168.5.62:9092
+#    kafka.topic-name: NSSFJ
+#    kafka.consumer.group.id: test-consumer-group
+#
+#    producer:
+#      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer