Browse Source

kafka消费程序

songwenbin 4 years ago
parent
commit
992857cbff

+ 14 - 0
kafka-consumer/README.md

@@ -0,0 +1,14 @@
+# golden-realtime-kafka
+    golden实时数据转发到kafka
+    
+
+
+
+
+
+
+
+
+
+
+

+ 34 - 0
kafka-consumer/build.gradle

@@ -0,0 +1,34 @@
+buildscript {
+    repositories {
+        mavenLocal()
+        maven { url "http://maven.aliyun.com/nexus/content/groups/public" }
+        mavenCentral()
+    }
+    dependencies {
+        classpath("$bootGroup:spring-boot-gradle-plugin:$springBootVersion")
+    }
+}
+
+apply plugin: "$bootGroup"
+apply plugin: 'io.spring.dependency-management'
+
+
+dependencies {
+
+    compile project(":common")
+    compile project(":opentsdb-client")
+    compile fileTree(dir: 'src/main/lib', include: '*.jar')
+
+    compile("$bootGroup:spring-boot-starter-web")
+    compile("$bootGroup:spring-boot-starter-undertow")
+    compile("$bootGroup:spring-boot-starter-log4j2")
+
+    compile 'com.alibaba:fastjson:1.2.17'
+    compile 'org.springframework.kafka:spring-kafka'
+
+    compile 'com.opencsv:opencsv:4.5'
+
+    testCompile("$bootGroup:spring-boot-starter-test")
+
+}
+

+ 32 - 0
kafka-consumer/src/main/java/com/gyee/wisdom/Bootstrap.java

@@ -0,0 +1,32 @@
+package com.gyee.wisdom;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.web.servlet.ServletComponentScan;
+import org.springframework.scheduling.annotation.EnableAsync;
+
+/**
+ * @author northriver
+ */
+@SpringBootApplication
+@ServletComponentScan
+@EnableAsync
+public class Bootstrap {
+
+    public static void main(String[] args) {
+        SpringApplication.run(Bootstrap.class, args);
+    }
+
+
+    @KafkaListener(id = "group1", topics = "NSSFJ")
+    public void listen(Object tsDatas) {
+//        logger.info("Received: " + foo);
+//        if (foo.getFoo().startsWith("fail")) {
+//            throw new RuntimeException("failed");
+//        }
+//        this.exec.execute(() -> System.out.println("Hit Enter to terminate..."));
+    }
+
+}
+
+

+ 29 - 0
kafka-consumer/src/main/java/com/gyee/wisdom/config/ConfigProperties.java

@@ -0,0 +1,29 @@
+package com.gyee.wisdom.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+
+@Data
+@Component
+@ConfigurationProperties("calculate.config")
+public class ConfigProperties {
+
+    //数据适配器websocket服务地址
+    private String serviceUrl;
+
+    //离线判定时间间隔,单位毫秒
+    private long offlineInterval = 600000;
+
+    //扫描实时数据线程轮询时间间隔,单位毫秒
+    private int readThreadInterval = 1000;
+
+    //计算状态线程轮询时间间隔,单位毫秒
+    private int calcThreadInterval = 1000;
+
+    //总装机容量
+    private double stationCapacity = 100;
+
+
+}

+ 28 - 0
kafka-consumer/src/main/java/com/gyee/wisdom/model/TagPoint.java

@@ -0,0 +1,28 @@
+package com.gyee.wisdom.model;
+
+import com.opencsv.bean.CsvBindByPosition;
+import lombok.Data;
+
+@Data
+public class TagPoint {
+    //NX_GD_NSSF_FJ_P1_L1_001_AI0001,1350048,FLOAT32,机舱X向振动,001,AI0001,001H
+    @CsvBindByPosition(position = 0,required = true)
+    private String tagCode;
+    @CsvBindByPosition(position = 1,required = true)
+    private String tagId;
+    @CsvBindByPosition(position = 2,required = true)
+    private String dataType;
+    @CsvBindByPosition(position = 3,required = true)
+    private String description;
+    @CsvBindByPosition(position = 4,required = true)
+    private String thingId;
+    @CsvBindByPosition(position = 5,required = true)
+    private String uniformCode;
+    @CsvBindByPosition(position = 6,required = true)
+    private short groupIndex;
+    @CsvBindByPosition(position = 7,required = true)
+    private int pointIndex;
+    @CsvBindByPosition(position = 8,required = true)
+    private int groupSize;
+
+}

+ 28 - 0
kafka-consumer/src/main/java/com/gyee/wisdom/model/TagPointData.java

@@ -0,0 +1,28 @@
+package com.gyee.wisdom.model;
+
+import com.opencsv.bean.CsvBindByPosition;
+import com.rtdb.api.model.RtdbData;
+import lombok.Data;
+
+@Data
+public class TagPointData implements Comparable {
+
+    private TagPoint tagPoint;
+    private RtdbData rtdbData;
+
+    @Override
+    public int compareTo(Object newData) {
+        TagPoint newPoint = ((TagPointData)newData).tagPoint;
+        int result = this.tagPoint.getGroupIndex() - newPoint.getGroupIndex();
+        if(result == 0){
+            result =this.tagPoint.getPointIndex() - newPoint.getPointIndex();
+        }
+        return result;
+    }
+
+    public  TagPointData(TagPoint tp, RtdbData rd) {
+        tagPoint = tp;
+        rtdbData = rd;
+    }
+
+}

+ 78 - 0
kafka-consumer/src/main/java/com/gyee/wisdom/service/CacheService.java

@@ -0,0 +1,78 @@
+package com.gyee.wisdom.service;
+
+import com.gyee.wisdom.model.TagPoint;
+import com.opencsv.bean.CsvToBeanBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+import org.springframework.stereotype.Service;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+@Slf4j
+@Service
+public class CacheService {
+
+    private int[] tagIds;
+
+    public int[] getTagIds() {
+        if (tagIds == null)
+        {
+            List<TagPoint> tps = getPointTags();
+            if (tps != null && tps.size() > 0) {
+                tagIds = tps.stream().mapToInt(t-> Integer.parseInt(t.getTagId())).toArray();
+            }
+        }
+
+        return tagIds;
+    }
+
+    private HashMap<Integer, TagPoint> tagPointMap;
+
+    public HashMap<Integer, TagPoint> getTagPointMap() {
+        if (tagPointMap == null) {
+            List<TagPoint> tps = getPointTags();
+            tagPointMap = new HashMap<>();
+            if (tps != null && tps.size() > 0) {
+                for(TagPoint tp : tps) {
+                    Integer id = Integer.parseInt(tp.getTagId());
+                    if (tagPointMap.containsKey(id) == false)
+                        tagPointMap.put(id, tp);
+                }
+            }
+        }
+
+        return  tagPointMap;
+    }
+
+
+    private List<TagPoint> pointTags;
+
+    public List<TagPoint> getPointTags() {
+        if (pointTags == null)
+            pointTags = createPointTags();
+
+        return pointTags;
+    }
+
+    private List<TagPoint> createPointTags() {
+        try {
+            Resource resource = new ClassPathResource("tag-point.csv");
+            InputStream ins = resource.getInputStream();
+            BufferedReader reader = new BufferedReader(new InputStreamReader(ins, "UTF-8"));
+            return new CsvToBeanBuilder(reader)
+                    .withType(TagPoint.class).withSeparator(',').build().parse();
+
+        } catch (Exception ex) {
+            log.error(ex.getMessage());
+        }
+
+        return  new ArrayList<>();
+    }
+
+}

+ 16 - 0
kafka-consumer/src/main/resources/application.yaml

@@ -0,0 +1,16 @@
+server:
+  port: 8068
+
+spring:
+  application:
+    name: ygys-golden-latest
+  kafka:
+    bootstrap-servers: 172.168.5.61:9092
+    kafka.topic-name: myTopic
+    kafka.consumer.group.id: test-consumer-group
+    producer:
+      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+
+
+
+

+ 9 - 0
kafka-consumer/src/main/resources/banner.txt

@@ -0,0 +1,9 @@
+                                      __            _____ __
+ ___.__. ____ ___.__. ______         |  | _______ _/ ____\  | _______              ____  ____   ____   ________ __  _____   ___________
+<   |  |/ ___<   |  |/  ___/  ______ |  |/ /\__  \\   __\|  |/ /\__  \    ______ _/ ___\/  _ \ /    \ /  ___/  |  \/     \_/ __ \_  __ \
+ \___  / /_/  >___  |\___ \  /_____/ |    <  / __ \|  |  |    <  / __ \_ /_____/ \  \__(  <_> )   |  \\___ \|  |  /  Y Y  \  ___/|  | \/
+ / ____\___  // ____/____  >         |__|_ \(____  /__|  |__|_ \(____  /          \___  >____/|___|  /____  >____/|__|_|  /\___  >__|
+ \/   /_____/ \/         \/               \/     \/           \/     \/               \/           \/     \/            \/     \/
+
+
+ :: ygys-kafka-consumer ::                    version 1.0.0

+ 64 - 0
kafka-consumer/src/main/resources/log4j2.xml

@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<Configuration status="WARN">
+    <Properties>
+        <Property name="Pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %5p %t %M(%F:%L) %m%n</Property>
+    </Properties>
+    <Filter type="ThresholdFilter" level="INFO"/>
+
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="${Pattern}"/>
+        </Console>
+        <RollingFile name="RollingFileInfo" fileName="logs/info.log"
+                     filePattern="logs/%d{yyyy-MM}/info-%d{yyyy-MM-dd}.%i.log">
+            <PatternLayout pattern="${Pattern}"/>
+            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
+            <Policies>
+                <TimeBasedTriggeringPolicy/>
+            </Policies>
+            <DefaultRolloverStrategy>
+                <Delete basePath="${baseDir}" maxDepth="2">
+                    <IfFileName glob="*/*.log" />
+                    <IfLastModified age="24H" />
+                </Delete>
+            </DefaultRolloverStrategy>
+        </RollingFile>
+        <RollingFile name="RollingFileWarn" fileName="logs/warn.log"
+                     filePattern="logs/%d{yyyy-MM}/warn-%d{yyyy-MM-dd}.%i.log">
+            <PatternLayout pattern="${Pattern}"/>
+            <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY"/>
+            <Policies>
+                <TimeBasedTriggeringPolicy/>
+            </Policies>
+            <DefaultRolloverStrategy>
+                <Delete basePath="${baseDir}" maxDepth="2">
+                    <IfFileName glob="*/*.log" />
+                    <IfLastModified age="24H" />
+                </Delete>
+            </DefaultRolloverStrategy>
+        </RollingFile>
+        <RollingFile name="RollingFileError" fileName="logs/error.log"
+                     filePattern="logs/%d{yyyy-MM}/error-%d{yyyy-MM-dd}.%i.log">
+            <PatternLayout pattern="${Pattern}"/>
+            <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
+            <Policies>
+                <TimeBasedTriggeringPolicy/>
+            </Policies>
+            <DefaultRolloverStrategy>
+                <Delete basePath="${baseDir}" maxDepth="2">
+                    <IfFileName glob="*/*.log" />
+                    <IfLastModified age="24H" />
+                </Delete>
+            </DefaultRolloverStrategy>
+        </RollingFile>
+    </Appenders>
+
+    <Loggers>
+        <Root level="WARN">
+            <AppenderRef ref="Console"/>
+         <!-- <appender-ref ref="RollingFileInfo"/>
+            <appender-ref ref="RollingFileWarn"/>-->
+            <appender-ref ref="RollingFileError"/>
+        </Root>
+    </Loggers>
+</Configuration>

File diff suppressed because it is too large
+ 109494 - 0
kafka-consumer/src/main/resources/tag-point.csv