Browse Source

电网光功率102传输协议实现

songwenbin 1 year ago
parent
commit
156aa79679
38 changed files with 2652 additions and 0 deletions
  1. 17 0
      gdnxak102/README.md
  2. 26 0
      gdnxak102/build.gradle
  3. 122 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/ApplicationBootstrap.java
  4. 103 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeClient.java
  5. 70 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeClientHandler.java
  6. 28 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeClientInitializer.java
  7. 30 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeDecoder.java
  8. 28 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeEncoder.java
  9. 15 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeMessage.java
  10. 78 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/GYMessage.java
  11. 39 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/KeepMessage.java
  12. 44 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/config/AkConfig.java
  13. 14 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/forcast/ForcastModel.java
  14. 161 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/forcast/ForcastService.java
  15. 9 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/forcast/TelemetryData.java
  16. 70 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/BasicInstruction102.java
  17. 63 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/FrameDecoder.java
  18. 187 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/IEC102ClientOld.java
  19. 154 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Client.java
  20. 72 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102ClientHandler.java
  21. 26 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102ClientInitializer.java
  22. 71 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Decoder.java
  23. 32 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Encoder.java
  24. 6 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102FrameType.java
  25. 63 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Message.java
  26. 27 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/SessionState.java
  27. 32 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/ClientHandlersInitializer.java
  28. 30 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/ClientIdleStateTrigger.java
  29. 52 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/ExponentialBackOffRetry.java
  30. 64 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/Pinger.java
  31. 59 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/ReconnectHandler.java
  32. 21 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/RetryPolicy.java
  33. 66 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/TcpClient.java
  34. 422 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/utils/ByteUtil.java
  35. 62 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/utils/SpringContextUtil.java
  36. 261 0
      gdnxak102/src/main/java/com/gyee/edge/gdnxak/utils/Util.java
  37. 20 0
      gdnxak102/src/main/resources/application.yaml
  38. 8 0
      gdnxak102/src/main/resources/banner.txt

+ 17 - 0
gdnxak102/README.md

@@ -0,0 +1,17 @@
+# gdnxak102
+
+
+## 国电宁夏大武口埃肯光功率数据采集
+
+### 数据源: 南瑞光功率预测系统
+### 通信协议: IEC102 + 光功率文本
+
+
+
+
+
+
+
+
+
+

+ 26 - 0
gdnxak102/build.gradle

@@ -0,0 +1,26 @@
+buildscript {
+    repositories {
+        mavenLocal()
+        maven {
+            allowInsecureProtocol = true
+            url "http://maven.aliyun.com/nexus/content/groups/public" }
+        mavenCentral()
+    }
+    dependencies {
+        classpath("$bootGroup:spring-boot-gradle-plugin:$springBootVersion")
+    }
+}
+
+apply plugin: "$bootGroup"
+apply plugin: 'io.spring.dependency-management'
+
+dependencies {
+    implementation project(":common:utils")
+    implementation("io.netty:netty-all:$nettyVersion")
+    implementation("$bootGroup:spring-boot-starter:$springBootVersion")
+    implementation("com.google.code.gson:gson:$gsonVersion")
+    implementation("org.apache.logging.log4j:log4j-core:$log4jVersion")
+    implementation("org.apache.logging.log4j:log4j-jul:$log4jVersion")
+    implementation("org.apache.logging.log4j:log4j-api:$log4jVersion")
+    implementation("org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion")
+}

+ 122 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/ApplicationBootstrap.java

@@ -0,0 +1,122 @@
+package com.gyee.edge.gdnxak;
+
+import com.gyee.edge.gdnxak.bridge.BridgeClient;
+import com.gyee.edge.gdnxak.bridge.GYMessage;
+import com.gyee.edge.gdnxak.config.AkConfig;
+import com.gyee.edge.gdnxak.forcast.ForcastModel;
+import com.gyee.edge.gdnxak.forcast.ForcastService;
+import com.gyee.edge.gdnxak.iec102.Iec102Client;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+@Slf4j
+@SpringBootApplication()
+public class ApplicationBootstrap implements CommandLineRunner {
+
+    @Autowired
+    AkConfig akConfig;
+
+    @Autowired
+    ForcastService forcastService;
+
+    Iec102Client iec102Client;
+    BridgeClient bridgeClient;
+
+    public static void main(String[] args) {
+        SpringApplication.run(ApplicationBootstrap.class, args);
+    }
+
+
+    @Override
+    public void run(String... args) throws Exception {
+        log.info("国电宁夏光功率102数据采集程序启动..............");
+        log.info("config : " + akConfig.toString());
+
+        try {
+            while (true) {
+                try {
+                    //检查iec102连接
+                    checkIec102Client();
+
+                    //检查bridge连接
+                    checkBridgeClient();
+
+                    Thread.sleep(akConfig.getHeartbeatInterval());
+                } catch (Exception e){
+                    log.error(e.getMessage());
+                }
+            }
+        } finally {
+            if (iec102Client != null)
+                iec102Client.close();
+
+            if (bridgeClient != null)
+                bridgeClient.close();
+
+        }
+    }
+
+    private void checkIec102Client() {
+        //如果没有,就创建客户端对象
+        if(iec102Client == null) {
+            log.info("iec102Client 初始化对象...");
+            iec102Client = new Iec102Client(akConfig.getIec102IP(), akConfig.getIec102Port());
+        }
+
+        if(iec102Client.isConnected() == false) {
+            iec102Client.connect();
+            //todo: 重连机制的优化
+        }
+
+        if(iec102Client.isConnected()) {
+            //发送心跳信号
+            iec102Client.sendHeartBeat();
+
+            //发送召唤用户数据指令
+            iec102Client.callUserData();
+        }
+
+    }
+
+    private void checkBridgeClient() {
+        //如果没有,就创建客户端对象
+        if(bridgeClient == null)
+            bridgeClient = new BridgeClient(akConfig.getBridgeIP(), akConfig.getBridgePort());
+
+        if(bridgeClient.isConnected() == false) {
+            bridgeClient.connect();
+            //todo: 重连机制的优化
+        }
+
+        if(bridgeClient.isConnected()) {
+
+            bridgeClient.sendKeepMessage();
+
+            //todo: 此处从缓存中读取数据
+            ForcastModel forcastModel = forcastService.getForcastModel();
+            while(forcastModel != null) {
+                GYMessage msg = forcastService.ForcastModel2GYMessage(forcastModel);
+                if (msg != null) {
+                    bridgeClient.sendGYMessage(msg);
+                }
+                forcastModel = forcastService.getForcastModel();
+            }
+
+            //发送功率预测数据
+//            GYMessage msg = forcastService.getGYMessage();
+//            if (msg != null) {
+//                bridgeClient.sendGYMessage(msg);
+//            } else {
+//                //发送心跳信号
+//                bridgeClient.sendKeepMessage();
+//            }
+
+        }
+
+    }
+
+}

+ 103 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeClient.java

@@ -0,0 +1,103 @@
+package com.gyee.edge.gdnxak.bridge;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Date;
+
+@Slf4j
+public class BridgeClient {
+
+    private String host;
+    private int port;
+    private EventLoopGroup group;
+    private Bootstrap bootstrap;
+    private boolean connected = false;
+    private Date lastActiveTime;
+
+    /** 将<code>Channel</code>保存起来, 可用于在其他非handler的地方发送数据 */
+    private Channel channel;
+
+    public BridgeClient(String host, int port) {
+        this.host = host;
+        this.port = port;
+        group = new NioEventLoopGroup();
+        // bootstrap 可重用, 只需在TcpClient实例化的时候初始化即可.
+        bootstrap = new Bootstrap();
+        bootstrap.group(group)
+                .channel(NioSocketChannel.class)
+                .handler(new BridgeClientInitializer(BridgeClient.this));
+    }
+
+    /**
+     * 向远程TCP服务器请求连接
+     */
+    public void connect() {
+        synchronized (bootstrap) {
+            ChannelFuture future = bootstrap.connect(host, port);
+            future.addListener(getConnectionListener());
+            this.channel = future.channel();
+            sleep(3000);
+        }
+    }
+
+    private ChannelFutureListener getConnectionListener() {
+        return new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (!future.isSuccess()) {
+                    future.channel().pipeline().fireChannelInactive();
+                    connected = false;
+                } else
+                    connected = true;
+            }
+        };
+    }
+
+    public boolean isConnected() {
+        return connected;
+    }
+
+    public void setConnected(boolean bValue) {
+        connected = bValue;
+    }
+
+
+    public void sendKeepMessage() {
+        GYMessage gyMessage = new GYMessage();
+        gyMessage.setKeepFlag(true);
+        gyMessage.setKeepMessage(new KeepMessage(KeepMessage.M_KEEP));
+        sendGYMessage(gyMessage);
+    }
+
+    public synchronized void sendGYMessage(GYMessage msg) {
+        if (channel != null) {
+            channel.writeAndFlush(msg);
+            sleep(1000);
+        }
+    }
+
+    public void close() {
+        if (group != null)
+            group.shutdownGracefully();
+    }
+
+    public void processMessage(KeepMessage kMsg) {
+       // log.info(kMsg.toHexString());
+        lastActiveTime = new Date();
+    }
+
+    private void sleep(int millionSencond) {
+        try {
+            Thread.sleep(millionSencond);
+        } catch (Exception e) {}
+
+    }
+
+}

+ 70 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeClientHandler.java

@@ -0,0 +1,70 @@
+package com.gyee.edge.gdnxak.bridge;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+
+@Slf4j
+@ChannelHandler.Sharable
+public class BridgeClientHandler extends SimpleChannelInboundHandler<KeepMessage> {
+
+	private BridgeClient bridgeClient;
+
+	public BridgeClientHandler(BridgeClient bridgeClient) {
+		this.bridgeClient = bridgeClient;
+	}
+
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+
+		bridgeClient.setConnected(true);
+		log.info("建立到Bridge的新连接,clientid = " + getClientId(ctx));
+
+	}
+
+	@Override
+	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+		bridgeClient.setConnected(false);
+		log.info("Bridge连接断开,clientid =" + getClientId(ctx));
+	}
+	
+	@Override
+	public void channelRead0(ChannelHandlerContext ctx, KeepMessage msg) throws IOException {
+		//log.debug("收到消息:" + msg.getHexString());
+		bridgeClient.processMessage(msg);
+
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+
+		bridgeClient.setConnected(false);
+
+		cause.printStackTrace();
+
+		log.info("102连接异常断开,clientid =" + getClientId(ctx));
+
+		//todo: 异常处理
+		//ctx.close();
+	}
+
+	private String getClientId(ChannelHandlerContext ctx) {
+		try {
+			InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
+			String clientIp = ipSocket.getAddress().getHostAddress();
+			int clientPort = ipSocket.getPort();
+			return clientIp + ":" + clientPort;
+		} catch (Exception e) {
+			log.info("获取连接信息异常!");
+			return "";
+		}
+
+	}
+
+}

+ 28 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeClientInitializer.java

@@ -0,0 +1,28 @@
+
+package com.gyee.edge.gdnxak.bridge;
+
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+
+public class BridgeClientInitializer extends ChannelInitializer<SocketChannel> {
+
+    private BridgeClientHandler bridgeClientHandler;
+
+    public BridgeClientInitializer(BridgeClient bridgeClient) {
+        //   Assert.notNull(tcpClient, "TcpClient can not be null.");
+        this.bridgeClientHandler = new BridgeClientHandler(bridgeClient);
+    }
+
+    @Override
+    public void initChannel(SocketChannel ch) {
+        ChannelPipeline pipeline = ch.pipeline();
+
+        //添加编解码器
+        pipeline.addLast(new BridgeEncoder());
+        pipeline.addLast(new BridgeDecoder());
+
+        pipeline.addLast(bridgeClientHandler);
+    }
+}

+ 30 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeDecoder.java

@@ -0,0 +1,30 @@
+package com.gyee.edge.gdnxak.bridge;
+
+import com.gyee.edge.gdnxak.utils.ByteUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+
+/**
+ * 通常服务端给客户端的返回消息只有一个字节
+ * 响应消息是:0xFF
+ * 主动消息:0x00
+ */
+@Slf4j
+public class BridgeDecoder extends ByteToMessageDecoder {
+	
+	@Override
+	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+		byte[] data = new byte[in.readableBytes()];
+		in.readBytes(data);
+		log.info("收到报文:" + ByteUtil.byteArrayToHexString(data));
+
+		KeepMessage msg = new KeepMessage(data);
+		out.add(msg);
+	}
+}

+ 28 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeEncoder.java

@@ -0,0 +1,28 @@
+package com.gyee.edge.gdnxak.bridge;
+
+import com.gyee.edge.gdnxak.utils.ByteUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class BridgeEncoder extends MessageToByteEncoder<GYMessage> {
+
+	@Override
+	protected void encode(ChannelHandlerContext ctx, GYMessage msg, ByteBuf out) throws Exception {
+		try {
+			if (msg != null) {
+				out.writeBytes(msg.toBytes());
+				log.info("发送报文:" + ByteUtil.byteArrayToHexString(msg.toBytes()));
+			} else {
+				log.error("无效的消息格式!");
+			}
+		} catch (Exception e) {
+			throw e;
+		}
+
+	}
+
+}

+ 15 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/BridgeMessage.java

@@ -0,0 +1,15 @@
+package com.gyee.edge.gdnxak.bridge;
+
+
+import lombok.Data;
+
+/**
+ *  Bridge消息
+ */
+@Data
+public class BridgeMessage {
+
+	//帧头
+	private byte[] data;
+
+}

+ 78 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/GYMessage.java

@@ -0,0 +1,78 @@
+package com.gyee.edge.gdnxak.bridge;
+
+import com.gyee.edge.common.utils.BytesUtils;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * @author :
+ * @date :Created in 2022/8/25 17:34
+ * @description:  定义gyfp2协议实体类
+ */
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class GYMessage implements Serializable {
+
+    private byte[] header=new byte[]{'G','Y','F','P','V','2','0','0'};
+    //长度 = 28 + 项数*4
+    //项数 = 数值数组的长度
+    private short length;
+    private short count;
+    private int attribute = 7;
+    private byte dataType = 0x0A;
+
+    private int addressIndex;
+    private long ts;
+    private float[] values;
+
+
+    private boolean keepFlag = false;
+    private KeepMessage keepMessage;
+
+
+
+     public byte[] toBytes(){
+
+         if (keepFlag && keepMessage != null) {
+             return keepMessage.getData();
+         }
+
+          /*不指定默认256,指定64以内默认大小都是64.
+         如果写入后数据大小未超过512个字节,则选择下一个16的整数倍进行扩容。比如写入数据后大小为12,则扩容后的capacity是16。
+         如果写入后数据大小超过512个字节,则选择下一个2n。比如写入后大小是512字节,则扩容后的capacity是210=1024。(因为29=512,长度已经不够了)
+         扩容不能超过max capacity,否则会报错*/
+         int sum = 0;
+         count = (short) values.length;
+         length = (short)(count * 4 + 28);
+
+         ByteBuf buffer = Unpooled.buffer(10);
+         buffer.writeBytes(getHeader());
+         //占住总长度字节大小,方便后续代替
+         buffer.writeShort(length);
+         //占住数据项个数大小,方便后续代替
+         buffer.writeShort(count);
+         //todo:此处attribute固定为0
+         buffer.writeInt(attribute);
+         //todo: 写数据类型,此处固定为10
+         buffer.writeBytes(new byte[] {(byte)dataType});
+         //todo: 写起始地址
+         buffer.writeBytes(BytesUtils.intToBytes(addressIndex,3));
+         //todo: 写时间戳
+         buffer.writeLong(ts);
+         for(float v : values) {
+             buffer.writeFloatLE(v);
+         }
+
+         return ByteBufUtil.getBytes(buffer);
+     }
+
+}

+ 39 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/bridge/KeepMessage.java

@@ -0,0 +1,39 @@
+package com.gyee.edge.gdnxak.bridge;
+
+
+import com.gyee.edge.gdnxak.iec102.Iec102FrameType;
+import com.gyee.edge.gdnxak.utils.ByteUtil;
+import lombok.Data;
+
+/**
+ *  Bridge通讯保持消息,相当于心跳信号
+ *  正向 keep-alive 发送的 flag 是字符串”KEEP”。
+ * 服务器发送一个字节进行应答,这个字节可以配置,典型值是 0xFF。
+ *  反向 keep-alive:
+ * 服务器发送一个字节的请求,这个字节可以配置,但是不能和服务器的正向 keep-alive 应答
+ * 相同,典型值是 0x00。
+ * 客户端发送KACK应答
+ */
+@Data
+public class KeepMessage {
+
+	//帧头
+	private byte[] data;
+
+	public KeepMessage(byte[] bytes) {
+		this.data = bytes;
+	}
+
+	public static byte[] M_KEEP = new byte[] {'K','E','E','P'};
+
+	public static byte[] M_KEEP_ACK = new byte[] {(byte)0xFF};
+
+	public static byte[] C_KEEP = new byte[] {(byte)0x00};
+
+	public static byte[] C_KEEP_ACK = new byte[] {'K','A','C','K'};
+
+	public String toHexString() {
+		return ByteUtil.byteArrayToHexString(data);
+	}
+
+}

+ 44 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/config/AkConfig.java

@@ -0,0 +1,44 @@
+package com.gyee.edge.gdnxak.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * 104规约的配置
+ */
+@Data
+@Component
+@ConfigurationProperties("iec102")
+public class AkConfig {
+
+    //102服务Ip地址
+    private String iec102IP;
+    //102服务端口
+    private short iec102Port = 2102;
+    //102服务轮询时间间隔
+    private int scanInterval = 5000;
+    //心跳监测时间间隔
+    private int heartbeatInterval = 10000;
+    //消息缓存最大数量
+    private int maxCacheCount = 200;
+    //光耀采集 bridge 服务ip
+    private String bridgeIP;
+    //bridge 服务端口
+    private short bridgePort = 9888;
+
+    //超短期功率预测点起始地址
+    private int cdqAddressStart = 0;
+    //短期功率预测点起始地址
+    private int dqAddressStart = 16;
+    //测风塔遥测点起始地址
+    private int cftAddressStart = 304;
+
+
+    @Override
+    public String toString() {
+        return "iec102 address = " + iec102IP + ", " + iec102Port +
+                ", bridge address = " + bridgeIP + ", " + bridgePort;
+    }
+
+}

+ 14 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/forcast/ForcastModel.java

@@ -0,0 +1,14 @@
+package com.gyee.edge.gdnxak.forcast;
+
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.Date;
+
+@Data
+public class ForcastModel {
+    public String dataType;
+    public Date reportTime;
+    public ArrayList<TelemetryData> forcastData;
+
+}

+ 161 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/forcast/ForcastService.java

@@ -0,0 +1,161 @@
+package com.gyee.edge.gdnxak.forcast;
+
+import com.gyee.edge.gdnxak.bridge.GYMessage;
+import com.gyee.edge.gdnxak.config.AkConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * 解析功率预测报文
+ * 缓存报文信息,作为102读取和bridge转发的中转
+ */
+@Component
+public class ForcastService {
+
+    @Autowired
+    AkConfig akConfig;
+
+    private int maxCacheCount = 200;
+
+    private ArrayBlockingQueue<ForcastModel> forcastQueue = new ArrayBlockingQueue<>(maxCacheCount);
+
+    public void putForcastModel(ForcastModel forcastModel) {
+        //如果队列已满,移除头部元素,腾出空间添加新元素
+        if (forcastQueue.offer(forcastModel) == false) {
+            forcastQueue.poll();
+            forcastQueue.offer(forcastModel);
+        }
+    }
+
+    public ForcastModel getForcastModel() {
+        return forcastQueue.poll();
+    }
+
+    public ForcastModel readForcastFileString(String fileString) {
+        /*
+         *解析字符串内容
+         */
+        ForcastModel ret = new ForcastModel();
+        ret.setForcastData(new ArrayList<>());
+        String xmlFlag = null;
+        //按行分割字符串得到lineArray数组
+        String[] lineArray = fileString.split("\r\n");
+        //遍历数组lineArray
+        for (int i = 0; i < lineArray.length; i++) {
+            //每行字符串line
+            String line = lineArray[i];
+            //解析type和time,例子:<! Entity=ZWS05F type=CDQ time='2023-02-27_17:15' !>
+            if (line.startsWith("<!")) {
+                //每行字符串按空格分割为fieldArray数组
+                String[] fieldArray = line.split(" ");
+                //遍历fieldArray数组,取出type和时间
+                for (int j = 0; j < fieldArray.length; j++) {
+                    String field = fieldArray[j];
+                    if (field.startsWith("type")) {
+                        ret.setDataType(field.substring(5));
+                    }
+                    if (field.startsWith("time")) {
+                        String substring = field.substring(6, field.length() - 1);
+                        SimpleDateFormat Format = new SimpleDateFormat("yyyy-MM-dd_HH:mm");
+                        Date date;
+                        try {
+                            date = Format.parse(substring);
+                        } catch (ParseException e) {
+                            throw new RuntimeException(e);
+                        }
+                        ret.setReportTime(date);
+                    }
+                }
+            }
+
+            if (line.startsWith("<")) {
+                String[] fieldArray = line.split("::");
+                xmlFlag = fieldArray[0].substring(1);
+            }
+
+            //解析功率数据
+            if (line.startsWith("#") && (Objects.equals(xmlFlag, "UltraShortTermForcast_V2P")
+                    || Objects.equals(xmlFlag, "ShortTermForcast"))) {
+                String[] fieldArray = line.split("   ");
+                if (fieldArray.length > 1) {
+                    TelemetryData td = new TelemetryData();
+                    td.setKey(fieldArray[0]);
+                    td.setValue(Double.parseDouble(fieldArray[1]));
+                    ret.getForcastData().add(td);
+                }
+            }
+
+            //解析测风塔数据
+            if (line.startsWith("#") && (Objects.equals(xmlFlag, "MastData"))) {
+                String[] fieldArray = line.split(" ");
+                if (fieldArray.length > 1) {
+                    TelemetryData td = new TelemetryData();
+                    td.setKey(fieldArray[1]);
+                    td.setValue(Double.parseDouble(fieldArray[2]));
+                    ret.getForcastData().add(td);
+                   // System.out.println(td);
+                }
+            }
+        }
+        return ret;
+    }
+
+    public GYMessage ForcastModel2GYMessage(ForcastModel forcastModel) {
+        GYMessage gyMessage = new GYMessage();
+        if("CDQ".equals(forcastModel.getDataType())) {
+            gyMessage.setAddressIndex(akConfig.getCdqAddressStart());
+        } else if ("DQ".equals(forcastModel.getDataType())) {
+            gyMessage.setAddressIndex(akConfig.getDqAddressStart());
+        } else if ("CFT".equals(forcastModel.getDataType())) {
+            gyMessage.setAddressIndex(akConfig.getCftAddressStart());
+        } else {
+            //无效的采集数据类型
+            return gyMessage;
+        }
+
+        gyMessage.setTs(forcastModel.getReportTime().getTime());
+
+        float[] vals = new float[forcastModel.getForcastData().size()];
+        int index = 0;
+
+        for(TelemetryData td : forcastModel.getForcastData()) {
+            float val = (float) td.value;
+            vals[index] = val;
+            index++;
+        }
+
+        gyMessage.setValues(vals);
+
+        return gyMessage;
+    }
+
+    //这是一个测试方法
+    private ForcastModel createForcastModel() {
+        ForcastModel result = new ForcastModel();
+        result.setDataType("CDQ");
+        result.setReportTime(new Date());
+        ArrayList<TelemetryData> lst = new ArrayList<>();
+        for (int i = 1; i < 17; i++) {
+            TelemetryData data = new TelemetryData();
+            data.setKey("CDQ" + i);
+            data.setValue(i + 0.35);
+            lst.add(data);
+        }
+        result.setForcastData(lst);
+        return result;
+    }
+
+    //这是一个测试方法
+    public GYMessage getGYMessage() {
+        ForcastModel forcastModel = createForcastModel();
+        return ForcastModel2GYMessage(forcastModel);
+
+    }
+}

+ 9 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/forcast/TelemetryData.java

@@ -0,0 +1,9 @@
+package com.gyee.edge.gdnxak.forcast;
+
+import lombok.Data;
+
+@Data
+public class TelemetryData {
+    public String key;
+    public double value;
+}

File diff suppressed because it is too large
+ 70 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/BasicInstruction102.java


+ 63 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/FrameDecoder.java

@@ -0,0 +1,63 @@
+package com.gyee.edge.gdnxak.iec102;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+/**
+ * 
+* @ClassName: Unpack104Util  
+* @Description: 解决TCP 拆包和沾包的问题 
+* @author sun 
+ */
+public class FrameDecoder extends ByteToMessageDecoder {
+
+	@Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
+
+	    //System.out.println("Enter Iec104Decoder ......");
+        // 记录包头开始的index
+        int beginReader = 0;
+        int newDataLength = 0;
+        while (buffer.readableBytes() > 0) {
+            // 获取包头开始的index  
+            beginReader = buffer.readerIndex();  
+            // 记录一个标志用于重置
+            buffer.markReaderIndex();  
+            // 读到了协议的开始标志,结束while循环
+            byte header = buffer.readByte();
+            //第一种情况,单字节帧
+            if (header == (byte)0xE5) {
+                buffer.readerIndex(beginReader);
+                ByteBuf data = buffer.readBytes(1);
+                out.add(data);
+            } else if (header == 0x10) {
+            	//第二种情况: 固定6字节长度帧
+                // 标记当前包为新包
+                newDataLength = 6;
+                if (buffer.readableBytes() < newDataLength-1)
+                    break;
+
+                buffer.readerIndex(beginReader);
+                ByteBuf data = buffer.readBytes(newDataLength);
+                out.add(data);
+            } else if (header == 0x68) {
+                if (buffer.readableBytes() < 10)
+                    break;
+                newDataLength = buffer.readShortLE();
+
+                if (buffer.readableBytes() < newDataLength-1)
+                    break;
+                buffer.readerIndex(beginReader);
+                ByteBuf data = buffer.readBytes(newDataLength+6);
+                out.add(data);
+            }
+            continue;
+        }
+
+	}
+
+}

+ 187 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/IEC102ClientOld.java

@@ -0,0 +1,187 @@
+package com.gyee.edge.gdnxak.iec102;
+
+import com.gyee.edge.gdnxak.config.AkConfig;
+import com.gyee.edge.gdnxak.utils.SpringContextUtil;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class IEC102ClientOld {
+
+    private ChannelHandlerContext ctxIec102;
+    private Channel bridgeChannel;
+    private AkConfig akConfig = SpringContextUtil.getBean(AkConfig.class);
+
+    private SessionState state;
+    private boolean isRunning = false;
+    private boolean isStarted = false;
+
+    public IEC102ClientOld(ChannelHandlerContext ctx) {
+        ctxIec102 = ctx;
+        state = SessionState.CONNECTED;
+    }
+
+    public void start2() {
+        Iec102Message msg = BasicInstruction102.createFixFrameMessage(BasicInstruction102.M_USERDATA);
+        sendIecMessage(msg);
+        System.out.println("test end!");
+    }
+
+    public void start() {
+        if (isRunning) {
+            log.warn("worker线程已经在运行中,endPoint = " );
+            return;
+        }
+
+        //连接到Bridge服务
+        if (bridgeChannel == null || bridgeChannel.isActive() == false) {
+            getBridgeThread().start();
+        }
+
+        //发送102采集指令,心跳信号等
+        if (isStarted) {
+            log.warn("102数据发送线程已启动!client = ");
+        } else {
+            getIec102ControlThread().start();
+        }
+    }
+
+    private Thread getBridgeThread() {
+        return new Thread(new Runnable() {
+            public void run() {
+                log.info("连接到Bridge服务的线程启动...");
+                //启动一个netty客户端,连接到光功率102服务;
+                EventLoopGroup group = new NioEventLoopGroup();
+                try {
+                    Bootstrap b = new Bootstrap();
+                    b.group(group)
+                            .channel(NioSocketChannel.class)
+                            .handler(new Iec102ClientInitializer(null));
+
+                    // Start the client.
+                    ChannelFuture f = b.connect(akConfig.getIec102IP(), akConfig.getIec102Port()).sync();
+
+                    bridgeChannel = f.channel();
+                    state = SessionState.BRIDAGE_CONNECTED;
+
+                    // Wait until the connection is closed.
+                    f.channel().closeFuture().sync();
+                } catch (Exception ex) {
+                    log.error(ex.getMessage());
+                    ex.printStackTrace();
+                } finally {
+                    bridgeChannel = null;
+                    group.shutdownGracefully();
+                    close();
+                }
+            }
+        });
+    }
+
+
+    private Thread getIec102ControlThread() {
+        return new Thread(new Runnable() {
+            public void run() {
+                log.info("启动IEC102指令控制线程...");
+                try {
+                    //主站初始画过程:请求链路状态;复位链路;请求链路状态
+                    //1、请求链路状态
+                    Iec102Message msg = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_LINK_STATUS);
+                    sendIecMessage(msg);
+                    sleep(1000);
+                    //2、复位链路
+                    Iec102Message msg1 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_RESET);
+                    sendIecMessage(msg1);
+                    sleep(1000);
+                    //3、请求链路状态
+                    Iec102Message msg12 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_LINK_STATUS);
+                    sendIecMessage(msg12);
+                    sleep(1000);
+
+                    // 开始循环召唤数据
+                    isStarted = true;
+                    while(state == SessionState.INITIALIZED) {
+                        try {
+                            //心跳信号, 使用请求链路状态命令代替
+                            Iec102Message msg21 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_LINK_STATUS);
+                            sendIecMessage(msg21);
+                            sleep(1000);
+                            //召唤二级数据
+                            Iec102Message msg22 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_USER2);
+                            sendIecMessage(msg22);
+                            sleep(1000);
+                            //召唤一级数据
+                            Iec102Message msg23 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_USER1);
+                            sendIecMessage(msg23);
+                            sleep(1000);
+
+                            //todo: 考虑在此处实现对bridge的心跳检测
+
+                        } catch (Exception e) {
+                            log.error(e.getMessage());
+                        }
+                    }
+                } catch (Exception e) {
+                    //todo: 异常处理
+                    log.error(e.getMessage());
+
+                } finally {
+                    isStarted = false;
+                }
+            }
+        });
+    }
+
+    public void close() {
+        //todo: 当bridge中断或发生其它导致无法正常采集和转发数据的情形时,此处考虑退出或异常恢复机制
+        if (bridgeChannel != null)
+            bridgeChannel.close();
+        if (ctxIec102 != null)
+            ctxIec102.close();
+        isRunning = false;
+        isStarted = false;
+    }
+
+    public void processMessage(Iec102Message response) {
+        if (response == null)
+            return;
+
+        try {
+            if (Iec102FrameType.Fixed == response.getFrameType()) {
+                //固定帧长,通过控制域第一个字节的值来区分处理
+                switch (response.getControl1()) {
+                    case 0x00:  //复位通讯
+                    case (byte) 0x0B:   //请求链路状态
+                        if (state != SessionState.INITIALIZED)
+                            state = SessionState.INITIALIZED;
+                        break;
+                    case 0x29:  //对召唤二级数据的响应
+                        break;
+                }
+            } else if (Iec102FrameType.Mutable == response.getFrameType()) {
+                //处理用户数据
+                log.info("收到用户消息,文件名:" + response.getDataFile());
+                log.info("文件内容:" + response.getDataContent());
+                //todo: 解析光功率预测文件
+
+                //todo: 将读取的功率预测数据写入到Bridge
+
+            }
+        } catch (Exception ex) {
+            log.error(ex.getMessage());
+            ex.printStackTrace();
+        }
+    }
+
+    private synchronized void sendIecMessage(Iec102Message msg) {
+        ctxIec102.writeAndFlush(msg);
+    }
+
+    private void sleep(int millionSencond) throws InterruptedException {
+        Thread.sleep(millionSencond);
+    }
+}

+ 154 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Client.java

@@ -0,0 +1,154 @@
+package com.gyee.edge.gdnxak.iec102;
+
+import com.gyee.edge.gdnxak.forcast.ForcastModel;
+import com.gyee.edge.gdnxak.forcast.ForcastService;
+import com.gyee.edge.gdnxak.utils.SpringContextUtil;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Date;
+
+@Slf4j
+public class Iec102Client {
+
+    private String host;
+    private int port;
+    private EventLoopGroup group;
+    private Bootstrap bootstrap;
+    private boolean connected = false;
+    private Date lastActiveTime;
+
+    private ForcastService forcastService = SpringContextUtil.getBean(ForcastService.class);
+
+    /** 将<code>Channel</code>保存起来, 可用于在其他非handler的地方发送数据 */
+    private Channel channel;
+
+    public Iec102Client(String host, int port) {
+        this.host = host;
+        this.port = port;
+        group = new NioEventLoopGroup();
+        // bootstrap 可重用, 只需在TcpClient实例化的时候初始化即可.
+        bootstrap = new Bootstrap();
+        bootstrap.group(group)
+                .channel(NioSocketChannel.class)
+                .handler(new Iec102ClientInitializer(Iec102Client.this));
+    }
+
+    /**
+     * 向远程TCP服务器请求连接
+     */
+    public void connect() {
+        if (connected)
+            return;
+
+        synchronized (bootstrap) {
+            log.info("开始连接...");
+
+            ChannelFuture future = bootstrap.connect(host, port);
+            future.addListener(getConnectionListener());
+            this.channel = future.channel();
+            sleep(3000);
+        }
+    }
+
+    private ChannelFutureListener getConnectionListener() {
+        return new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (!future.isSuccess()) {
+                    future.channel().pipeline().fireChannelInactive();
+                }
+            }
+        };
+    }
+
+    public boolean isConnected() {
+        return connected;
+    }
+
+    public void setConnected(boolean bValue) {
+        connected = bValue;
+    }
+
+    public void start() {
+        //主站初始画过程:请求链路状态;复位链路;请求链路状态
+        //1、请求链路状态
+        Iec102Message msg = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_LINK_STATUS);
+        sendMessage(msg);
+        //2、复位链路
+        Iec102Message msg1 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_RESET);
+        sendMessage(msg1);
+        //3、请求链路状态
+        Iec102Message msg12 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_LINK_STATUS);
+        sendMessage(msg12);
+
+    }
+
+    public synchronized void sendMessage(Iec102Message msg) {
+        if (channel != null) {
+            channel.writeAndFlush(msg);
+            sleep(1000);
+        }
+    }
+
+    public void sendHeartBeat() {
+        //1、请求链路状态
+        Iec102Message msg = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_LINK_STATUS);
+        sendMessage(msg);
+    }
+
+    public void callUserData() {
+        //召唤一级数据
+        Iec102Message msg23 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.C_CALL_USER1);
+//        Iec102Message msg23 = BasicInstruction102.createFixFrameMessage(BasicInstruction102.M_USERDATA2);
+        sendMessage(msg23);
+    }
+
+    public void close() {
+        if (group != null)
+            group.shutdownGracefully();
+    }
+
+    public void processMessage(Iec102Message response) {
+        if (response == null)
+            return;
+        lastActiveTime = new Date();
+
+        try {
+            if (Iec102FrameType.Fixed == response.getFrameType()) {
+                //固定帧长,通过控制域第一个字节的值来区分处理
+                switch (response.getControl1()) {
+                    case 0x00:  //复位通讯
+                    case (byte) 0x0B:   //请求链路状态
+                    case 0x29:  //对召唤二级数据的响应
+                        break;
+                }
+            } else if (Iec102FrameType.Mutable == response.getFrameType()) {
+                //处理用户数据
+                log.info("收到用户消息,文件名:" + response.getDataFile());
+               // log.debug("文件内容:" + response.getDataContent());
+                //todo: 解析光功率预测文件
+                ForcastModel forcastModel = forcastService.readForcastFileString(response.getDataContent());
+                forcastService.putForcastModel(forcastModel);
+
+            }
+        } catch (Exception ex) {
+            log.error(ex.getMessage());
+            ex.printStackTrace();
+        }
+    }
+
+
+    private void sleep(int millionSencond) {
+        try {
+            Thread.sleep(millionSencond);
+        } catch (Exception e) {}
+
+    }
+}

+ 72 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102ClientHandler.java

@@ -0,0 +1,72 @@
+package com.gyee.edge.gdnxak.iec102;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+
+@Slf4j
+@ChannelHandler.Sharable
+public class Iec102ClientHandler extends SimpleChannelInboundHandler<Iec102Message> {
+
+	private Iec102Client iec102Client;
+
+	public Iec102ClientHandler(Iec102Client iec102Client) {
+		this.iec102Client = iec102Client;
+	}
+
+	@Override
+	public void channelActive(ChannelHandlerContext ctx) throws Exception {
+
+		//启动iec102客户端的主站服务
+		iec102Client.setConnected(true);
+		iec102Client.start();
+		log.info("iec102连接成功,clientid =" + getClientId(ctx));
+	}
+
+	@Override
+	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+		//关闭旧的客户端对象
+//		if (iec102Client != null) {
+//			iec102Client.close();
+//		}
+		iec102Client.setConnected(false);
+		log.info("iec102连接断开,clientid =" + getClientId(ctx));
+	}
+	
+	@Override
+	public void channelRead0(ChannelHandlerContext ctx, Iec102Message msg102) throws IOException {
+//		log.debug("收到消息:" + msg102.getHexString());
+
+		if (iec102Client != null) {
+			iec102Client.processMessage(msg102);
+		}
+	}
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		log.info("102连接异常断开,clientid =" + getClientId(ctx));
+		cause.printStackTrace();
+
+
+		//todo: 异常处理
+		//ctx.close();
+	}
+
+	private String getClientId(ChannelHandlerContext ctx) {
+		try {
+			InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
+			String clientIp = ipSocket.getAddress().getHostAddress();
+			int clientPort = ipSocket.getPort();
+			return clientIp + ":" + clientPort;
+		} catch (Exception e) {
+			log.info("获取连接信息异常!");
+			return "";
+		}
+
+	}
+
+}

+ 26 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102ClientInitializer.java

@@ -0,0 +1,26 @@
+
+package com.gyee.edge.gdnxak.iec102;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+
+public class Iec102ClientInitializer extends ChannelInitializer<SocketChannel> {
+
+    private Iec102ClientHandler iec102ClientHandler;
+
+    public Iec102ClientInitializer(Iec102Client iec102Client) {
+        //   Assert.notNull(tcpClient, "TcpClient can not be null.");
+        this.iec102ClientHandler = new Iec102ClientHandler(iec102Client);
+    }
+
+    @Override
+    public void initChannel(SocketChannel ch) {
+        ChannelPipeline pipeline = ch.pipeline();
+
+        pipeline.addLast(new FrameDecoder());
+        pipeline.addLast(new Iec102Encoder());
+        pipeline.addLast(new Iec102Decoder());
+        pipeline.addLast(iec102ClientHandler);
+    }
+}

+ 71 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Decoder.java

@@ -0,0 +1,71 @@
+package com.gyee.edge.gdnxak.iec102;
+
+
+import com.gyee.edge.gdnxak.utils.ByteUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.List;
+
+
+/**
+ * 解码器
+ */
+@Slf4j
+public class Iec102Decoder extends ByteToMessageDecoder {
+	
+	@Override
+	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+		byte[] data = new byte[in.readableBytes()];
+		in.readBytes(data);
+		log.info("收到报文:" + ByteUtil.byteArrayToHexString(data));
+
+		Iec102Message msg102 = new Iec102Message();
+		if (data.length == 1 && data[0] == 0xE5) {
+			msg102.setFrameType(Iec102FrameType.Single);
+			msg102.setHeader((byte)0xE5);
+		} else if (data.length == 6 && data[0] == 0x10) {
+			msg102.setFrameType(Iec102FrameType.Fixed);
+			msg102.setHeader(data[0]);
+			msg102.setControl1(data[1]);
+			msg102.setAddress(in.getShort(2));
+			msg102.setCheckSum(data[4]);
+			msg102.setFixedFrame(data);
+		} else if (data.length > 6 && data[0] == (byte)0x68) {
+			msg102.setFrameType(Iec102FrameType.Mutable);
+			msg102.setHeader(data[0]);
+			short len = in.getShortLE(1);
+			msg102.setLength(len);
+			msg102.setControl1(data[4]);
+			msg102.setAddress(in.getShortLE(5));
+			//msg102.setCheckSum(data[len-2]);
+			int udataLength = len-3;
+			in.readerIndex(7);
+			byte[] arr = Arrays.copyOfRange(data,7,len+4);
+			msg102.setUserData(arr);
+
+			msg102.setTYP(data[7]);
+			msg102.setVSQ(data[8]);
+			msg102.setCOT(data[9]);
+			msg102.setDeviceAddress(in.getShortLE(10));
+			msg102.setRecordAddress(data[12]);
+
+			byte[] arrFileName = Arrays.copyOfRange(data, 13, 45);
+			String fileName = new String(arrFileName,"UTF-8");
+			msg102.setDataFile(fileName);
+
+			byte[] arrFileData = Arrays.copyOfRange(data, 46, data.length-3);
+			String fileData = new String(arrFileData,"GBK");
+			msg102.setDataContent(fileData);
+
+			//log.info(msg102.getDataContent());
+		}
+
+		if (msg102.getHeader() != 0)
+			out.add(msg102);
+	}
+}

+ 32 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Encoder.java

@@ -0,0 +1,32 @@
+package com.gyee.edge.gdnxak.iec102;
+
+import com.gyee.edge.gdnxak.utils.ByteUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class Iec102Encoder extends MessageToByteEncoder<Iec102Message> {
+
+	@Override
+	protected void encode(ChannelHandlerContext ctx, Iec102Message msg, ByteBuf out) throws Exception {
+		try {
+			if (msg.getFixedFrame() != null) {
+				out.writeBytes(msg.getFixedFrame());
+				log.info("发送报文:" + ByteUtil.byteArrayToHexString(msg.getFixedFrame()));
+			} else {
+				log.error("无效的消息格式!");
+				//byte[] bytes = Encoder104.encoder(msg);
+				//out.writeBytes(bytes);
+			}
+		} catch (Exception e) {
+//			log.error(e.getMessage());
+//			e.printStackTrace();
+			throw e;
+		}
+
+	}
+
+}

+ 6 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102FrameType.java

@@ -0,0 +1,6 @@
+package com.gyee.edge.gdnxak.iec102;
+
+//IEC102的帧格式:单个字符,固定长度帧,可变长度帧
+public enum Iec102FrameType {
+    Single,Fixed,Mutable
+}

+ 63 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/Iec102Message.java

@@ -0,0 +1,63 @@
+package com.gyee.edge.gdnxak.iec102;
+
+
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ *  IEC102消息的基本结构
+ */
+@Data
+public class Iec102Message {
+
+	//IEC102的帧格式:单个字符,固定长度帧,可变长度帧
+	private Iec102FrameType frameType;
+
+	//单帧
+	//单个字符:0xE5,固定长度:0x10,可变长度: 0x68
+	private byte singleFrame = (byte)0xE5;
+
+	//固定长度帧,长度6
+	private byte[] fixedFrame;
+
+	//帧头
+	private byte header;
+
+	//帧长
+	private short length;
+
+	//控制域 2字节
+	private byte control1;
+
+	//地址域: 2字节
+	private short address;
+
+	//用户数据
+	private byte[] userData;
+
+	//类型标识
+	private byte TYP;
+	//可变结构限定词
+	private byte VSQ;
+	//传送原因
+	private byte COT;
+	//设备地址
+	private short deviceAddress;
+	//记录地址
+	private byte recordAddress;
+	//数据文件名 32个字节
+	private String dataFile;
+	//数据文件内容,UTF-8格式
+	private String dataContent;
+
+	//校验和
+	private byte checkSum;
+
+	//帧尾
+	private byte footer = 0x16;
+
+	private String hexString;
+
+}

+ 27 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/iec102/SessionState.java

@@ -0,0 +1,27 @@
+package com.gyee.edge.gdnxak.iec102;
+
+
+/**
+ * 客户端会话状态
+ */
+public enum SessionState {
+    CONNECTED(0,"IEC102 tcp连接完成"),
+    BRIDAGE_DISCONNECTED (1, "BRIDGE TCP连接断开"),
+    BRIDAGE_CONNECTED (2, "BRIDGE TCP连接完成"),
+    INITIALIZED (3, "iec102初始化完成"),
+    DISCONNECTED(4, "TCP连接断开");
+    private int code;
+    private String describe;
+
+    SessionState(int code, String describe) {
+        this.code = code;
+        this.describe = describe;
+    }
+
+    public static String getDescribe(int code) throws Exception {
+        for (SessionState value : SessionState.values()) {
+            if (value.code == code) return value.describe;
+        }
+        throw  new Exception();
+    }
+}

+ 32 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/ClientHandlersInitializer.java

@@ -0,0 +1,32 @@
+package com.gyee.edge.gdnxak.nettyexample;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.util.CharsetUtil;
+
+public class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> {
+    private ReconnectHandler reconnectHandler;
+   // private EchoHandler echoHandler;
+
+    public ClientHandlersInitializer(TcpClient tcpClient) {
+     //   Assert.notNull(tcpClient, "TcpClient can not be null.");
+        this.reconnectHandler = new ReconnectHandler(tcpClient);
+       // this.echoHandler = new EchoHandler();
+    }
+
+    @Override
+    protected void initChannel(SocketChannel ch) throws Exception {
+        ChannelPipeline pipeline = ch.pipeline();
+        pipeline.addLast(this.reconnectHandler);
+        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+        pipeline.addLast(new LengthFieldPrepender(4));
+        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
+        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
+        pipeline.addLast(new Pinger());
+    }
+}

+ 30 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/ClientIdleStateTrigger.java

@@ -0,0 +1,30 @@
+package com.gyee.edge.gdnxak.nettyexample;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+
+/**
+ * <p>
+ *  用于捕获{@link IdleState#WRITER_IDLE}事件(未在指定时间内向服务器发送数据),然后向<code>Server</code>端发送一个心跳包。
+ * </p>
+ */
+public class ClientIdleStateTrigger extends ChannelInboundHandlerAdapter {
+
+    public static final String HEART_BEAT = "heart beat!";
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            IdleState state = ((IdleStateEvent) evt).state();
+            if (state == IdleState.WRITER_IDLE) {
+                // write heartbeat to server
+                ctx.writeAndFlush(HEART_BEAT);
+            }
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+
+}

+ 52 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/ExponentialBackOffRetry.java

@@ -0,0 +1,52 @@
+package com.gyee.edge.gdnxak.nettyexample;
+
+import java.util.Random;
+
+/**
+ * <p>Retry policy that retries a set number of times with increasing sleep time between retries</p>
+ */
+public class ExponentialBackOffRetry implements RetryPolicy {
+
+    private static final int MAX_RETRIES_LIMIT = 29;
+    private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;
+
+    private final Random random = new Random();
+    private final long baseSleepTimeMs;
+    private final int maxRetries;
+    private final int maxSleepMs;
+
+    public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries) {
+        this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
+    }
+
+    public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) {
+        this.maxRetries = maxRetries;
+        this.baseSleepTimeMs = baseSleepTimeMs;
+        this.maxSleepMs = maxSleepMs;
+    }
+
+    @Override
+    public boolean allowRetry(int retryCount) {
+        if (retryCount < maxRetries) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public long getSleepTimeMs(int retryCount) {
+        if (retryCount < 0) {
+            throw new IllegalArgumentException("retries count must greater than 0.");
+        }
+        if (retryCount > MAX_RETRIES_LIMIT) {
+            System.out.println(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
+            retryCount = MAX_RETRIES_LIMIT;
+        }
+        long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << retryCount));
+        if (sleepMs > maxSleepMs) {
+            System.out.println(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
+            sleepMs = maxSleepMs;
+        }
+        return sleepMs;
+    }
+}

+ 64 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/Pinger.java

@@ -0,0 +1,64 @@
+package com.gyee.edge.gdnxak.nettyexample;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.ScheduledFuture;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>客户端连接到服务器端后,会循环执行一个任务:随机等待几秒,然后ping一下Server端,即发送一个心跳包。</p>
+ */
+public class Pinger extends ChannelInboundHandlerAdapter {
+
+    private Random random = new Random();
+    private int baseRandom = 8;
+
+    private Channel channel;
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        super.channelActive(ctx);
+        this.channel = ctx.channel();
+
+        ping(ctx.channel());
+    }
+
+    private void ping(Channel channel) {
+        int second = Math.max(1, random.nextInt(baseRandom));
+        System.out.println("next heart beat will send after " + second + "s.");
+        ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() {
+            @Override
+            public void run() {
+                if (channel.isActive()) {
+                    System.out.println("sending heart beat to the server...");
+                    channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
+                } else {
+                    System.err.println("The connection had broken, cancel the task that will send a heart beat.");
+                    channel.closeFuture();
+                    throw new RuntimeException();
+                }
+            }
+        }, second, TimeUnit.SECONDS);
+
+        future.addListener(new GenericFutureListener() {
+            @Override
+            public void operationComplete(Future future) throws Exception {
+                if (future.isSuccess()) {
+                    ping(channel);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        // 当Channel已经断开的情况下, 仍然发送数据, 会抛异常, 该方法会被调用.
+        cause.printStackTrace();
+        ctx.close();
+    }
+}

+ 59 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/ReconnectHandler.java

@@ -0,0 +1,59 @@
+package com.gyee.edge.gdnxak.nettyexample;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.EventLoop;
+
+import java.util.concurrent.TimeUnit;
+
+@ChannelHandler.Sharable
+public class ReconnectHandler extends ChannelInboundHandlerAdapter {
+
+    private int retries = 0;
+    private RetryPolicy retryPolicy;
+
+    private TcpClient tcpClient;
+
+    public ReconnectHandler(TcpClient tcpClient) {
+        this.tcpClient = tcpClient;
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        System.out.println("Successfully established a connection to the server.");
+        retries = 0;
+        ctx.fireChannelActive();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        if (retries == 0) {
+            System.err.println("Lost the TCP connection with the server.");
+            ctx.close();
+        }
+
+        boolean allowRetry = getRetryPolicy().allowRetry(retries);
+        if (allowRetry) {
+
+            long sleepTimeMs = getRetryPolicy().getSleepTimeMs(retries);
+
+            System.out.println(String.format("Try to reconnect to the server after %dms. Retry count: %d.", sleepTimeMs, ++retries));
+
+            final EventLoop eventLoop = ctx.channel().eventLoop();
+            eventLoop.schedule(() -> {
+                System.out.println("Reconnecting ...");
+                tcpClient.connect();
+            }, sleepTimeMs, TimeUnit.MILLISECONDS);
+        }
+        ctx.fireChannelInactive();
+    }
+
+
+    private RetryPolicy getRetryPolicy() {
+        if (this.retryPolicy == null) {
+            this.retryPolicy = tcpClient.getRetryPolicy();
+        }
+        return this.retryPolicy;
+    }
+}

+ 21 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/RetryPolicy.java

@@ -0,0 +1,21 @@
+package com.gyee.edge.gdnxak.nettyexample;
+
+public interface RetryPolicy {
+    /**
+     * Called when an operation has failed for some reason. This method should return
+     * true to make another attempt.
+     *
+     * @param retryCount the number of times retried so far (0 the first time)
+     * @return true/false
+     */
+    boolean allowRetry(int retryCount);
+
+    /**
+     * get sleep time in ms of current retry count.
+     *
+     * @param retryCount current retry count
+     * @return the time to sleep
+     */
+    long getSleepTimeMs(int retryCount);
+
+}

+ 66 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/nettyexample/TcpClient.java

@@ -0,0 +1,66 @@
+package com.gyee.edge.gdnxak.nettyexample;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public class TcpClient {
+    private String host;
+    private int port;
+    private Bootstrap bootstrap;
+    /** 重连策略 */
+    private RetryPolicy retryPolicy;
+    /** 将<code>Channel</code>保存起来, 可用于在其他非handler的地方发送数据 */
+    private Channel channel;
+
+    public TcpClient(String host, int port) {
+        this(host, port, new ExponentialBackOffRetry(1000, Integer.MAX_VALUE, 60 * 1000));
+    }
+
+    public TcpClient(String host, int port, RetryPolicy retryPolicy) {
+        this.host = host;
+        this.port = port;
+        this.retryPolicy = retryPolicy;
+        init();
+    }
+
+    /**
+     * 向远程TCP服务器请求连接
+     */
+    public void connect() {
+        synchronized (bootstrap) {
+            ChannelFuture future = bootstrap.connect(host, port);
+            future.addListener(getConnectionListener());
+            this.channel = future.channel();
+        }
+    }
+
+    public RetryPolicy getRetryPolicy() {
+        return retryPolicy;
+    }
+
+    private void init() {
+        EventLoopGroup group = new NioEventLoopGroup();
+        // bootstrap 可重用, 只需在TcpClient实例化的时候初始化即可.
+        bootstrap = new Bootstrap();
+        bootstrap.group(group)
+                .channel(NioSocketChannel.class)
+                .handler(new ClientHandlersInitializer(TcpClient.this));
+    }
+
+    private ChannelFutureListener getConnectionListener() {
+        return new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (!future.isSuccess()) {
+                    future.channel().pipeline().fireChannelInactive();
+                }
+            }
+        };
+    }
+
+}

+ 422 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/utils/ByteUtil.java

@@ -0,0 +1,422 @@
+package com.gyee.edge.gdnxak.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Calendar;
+import java.util.Date;
+
+/**
+ * 
+* @ClassName: ByteUtil  
+* @Description: byte 工具类 
+* @author sun 
+ */
+public class ByteUtil {
+
+    public static byte[] float2Bytes(float value) {
+
+        int tempVal = Float.floatToIntBits(value);
+        byte[] result = new byte[4];
+
+        result[0] = (byte) tempVal;
+        result[1] = (byte) (tempVal >> 8);
+        result[2] = (byte) (tempVal >> 16);
+        result[3] = (byte) (tempVal >> 24);
+
+        return result;
+    }
+
+    public static float bytes2Float(byte[] bytes) {
+
+        return Float.intBitsToFloat((bytes[0] & 0xff) |
+                ((bytes[1] & 0xff) << 8) |
+                ((bytes[2] & 0xff) << 16) |
+                ((bytes[3] & 0xff) << 24));
+
+    }
+
+	/**
+	 * 
+	* @Title: intToByteArray  
+	* @Description: int 转换成 byte数组 
+	* @param @param i
+	* @param @return 
+	* @return byte[]   
+	* @throws
+	 */
+	public static byte[] intToByteArray(int i) {
+        byte[] result = new byte[4];
+        result[0] = (byte) ((i >> 24) & 0xFF);
+        result[1] = (byte) ((i >> 16) & 0xFF);
+        result[2] = (byte) ((i >> 8) & 0xFF);
+        result[3] = (byte) (i & 0xFF);
+        return result;
+    }
+	
+	/**
+	* @Title: shortToByteArray  
+	* @Description: short 转换成 byte[] 
+	* @param @param val
+	* @param @return 
+	* @return byte[]   
+	* @throws
+	 */
+	public static byte[] shortToByteArray(short val) {
+		byte[] b = new byte[2];
+		b[0] = (byte) ((val >> 8) & 0xff);
+		b[1] = (byte) (val & 0xff);
+		return b;
+	}
+	
+	/**
+	 * 
+	* @Title: byteArrayToInt  
+	* @Description: byte[] 转换成 int
+	* @param @param bytes
+	* @param @return 
+	* @return int   
+	* @throws
+	 */
+	public static int byteArrayToInt(byte[] bytes) {
+        int value = 0;
+        for (int i = 0; i < 4; i++) {
+            int shift = (3 - i) * 8;
+            value += (bytes[i] & 0xFF) << shift;
+        }
+        return value;
+    }
+	
+	/**
+	 * 
+	* @Title: byteArrayToShort  
+	* @Description: byte[] 转换成short 
+	* @param @param bytes
+	* @param @return 
+	* @return short   
+	* @throws
+	 */
+	public static short byteArrayToShort(byte[] bytes) {
+        short value = 0;
+        for (int i = 0; i < 2; i++) {
+            int shift = (1 - i) * 8;
+            value += (bytes[i] & 0xFF) << shift;
+        }
+        return value;
+    }
+	
+	
+//	/**
+//	 * 
+//	* @Title: listToBytes  
+//	* @Description: TODO 
+//	* @param @param byteList
+//	* @param @return 
+//	* @return byte[]   
+//	* @throws
+//	 */
+//	public static byte[] listToBytes(List<Byte> byteList) {
+//		byte[] bytes = new byte[byteList.size()];
+//		int index = 0;
+//		for (Byte item : byteList) {
+//			bytes[index++] = item;
+//		}
+//		return bytes;
+//	}
+	
+	/**
+	 * 
+	* @Title: date2HByte  
+	* @Description: 日期转换成 CP56Time2a
+	* @param @param date
+	* @param @return 
+	* @return byte[]   
+	* @throws
+	 */
+    public static byte[] date2Hbyte(Date date) {
+    	ByteArrayOutputStream bOutput = new ByteArrayOutputStream();
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(date);
+        // 毫秒需要转换成两个字节其中 低位在前高位在后 
+        // 先转换成short
+        int millisecond = calendar.get(Calendar.SECOND) * 1000 + calendar.get(Calendar.MILLISECOND);
+        
+        // 默认的高位在前
+        byte[] millisecondByte = intToByteArray(millisecond);
+        bOutput.write((byte) millisecondByte[3]);
+        bOutput.write((byte) millisecondByte[2]);
+        
+        // 分钟 只占6个比特位 需要把前两位置为零 
+        bOutput.write((byte) calendar.get(Calendar.MINUTE));
+        // 小时需要把前三位置零
+        bOutput.write((byte) calendar.get(Calendar.HOUR_OF_DAY));
+        // 星期日的时候 week 是0 
+        int week = calendar.get(Calendar.DAY_OF_WEEK);
+        if (week == Calendar.SUNDAY) {
+        	week = 7;
+        } else {
+        	week--;
+        } 
+        // 前三个字节是 星期 因此需要将星期向左移5位  后五个字节是日期  需要将两个数字相加 相加之前需要先将前三位置零
+        bOutput.write((byte) (week << 5) + (calendar.get(Calendar.DAY_OF_MONTH)));
+        // 前四字节置零
+        bOutput.write((byte) ((byte) calendar.get(Calendar.MONTH) + 1));
+        bOutput.write((byte) (calendar.get(Calendar.YEAR) - 2000));
+        return bOutput.toByteArray();
+    }
+    
+    
+    /**
+	 * 
+	* @Title: date2HByte  
+	* @Description:CP56Time2a转换成  时间
+	* @param @param date
+	* @param @return 
+	* @return byte[]   
+	* @throws
+	 */
+    public static Date  byte2Hdate(byte[] dataByte) {
+        int year = (dataByte[6] & 0x7F) + 2000;
+        int month = dataByte[5] & 0x0F;
+        int day = dataByte[4] & 0x1F;
+        int hour = dataByte[3] & 0x1F;
+        int minute = dataByte[2] & 0x3F;
+        int second = dataByte[1] > 0 ? dataByte[1] : (int) (dataByte[1] & 0xff);
+        int millisecond = dataByte[0] > 0 ? dataByte[0] : (int) (dataByte[0] & 0xff);
+        millisecond = (second << 8) + millisecond;
+        second = millisecond / 1000;
+        millisecond = millisecond % 1000;
+        Calendar calendar = Calendar.getInstance();
+        calendar.set(Calendar.YEAR, year);
+        calendar.set(Calendar.MONTH, month);
+        calendar.set(Calendar.DAY_OF_MONTH, day);
+        calendar.set(Calendar.HOUR_OF_DAY, hour);
+        calendar.set(Calendar.MINUTE, minute);
+        calendar.set(Calendar.SECOND, second);
+        calendar.set(Calendar.MILLISECOND, millisecond);
+        return calendar.getTime();
+    }
+
+	public static String byteArrayToHexString(byte[] array) {
+        return byteArray2HexString(array, Integer.MAX_VALUE, false);
+    }
+
+	public static String byteArray2HexString(byte[] arrBytes, int count, boolean blank) {
+        String ret = "";
+        if (arrBytes == null || arrBytes.length < 1) {
+        	return ret;
+        }
+        if (count > arrBytes.length) {
+        	count = arrBytes.length;
+        }
+        StringBuilder builder = new StringBuilder();
+
+        for (int i = 0; i < count; i++) {
+            ret = Integer.toHexString(arrBytes[i] & 0xFF).toUpperCase();
+            if (ret.length() == 1) {
+            	builder.append("0").append(ret);
+            } else {
+            	builder.append(ret);
+            }
+            if (blank) {
+            	builder.append(" ");
+            }
+        }
+
+        return builder.toString();
+
+    }
+
+    /**
+     * 返回指定位置的数组
+     * @param bytes
+     * @param start 开始位置
+     * @param length  截取长度
+     * @return
+     */
+	public  static byte[] getByte(byte[] bytes, int start, int length) {
+		byte[] ruleByte = new byte[length];
+		int index = 0;
+		while (index < length) {
+			ruleByte[index++] = bytes[start++];
+		}
+		return ruleByte;
+	}
+
+    /**
+     * 将int转为高字节在前,低字节在后的byte数组(大端)
+     * @param n int
+     * @return byte[]
+     */
+    public static byte[] intToByteBig(int n) {
+        byte[] b = new byte[4];
+        b[3] = (byte) (n & 0xff);
+        b[2] = (byte) (n >> 8 & 0xff);
+        b[1] = (byte) (n >> 16 & 0xff);
+        b[0] = (byte) (n >> 24 & 0xff);
+        return b;
+    }
+    /**
+     * 将int转为低字节在前,高字节在后的byte数组(小端)
+     * @param n int
+     * @return byte[]
+     */
+    public static byte[] intToByteLittle(int n) {
+        byte[] b = new byte[4];
+        b[0] = (byte) (n & 0xff);
+        b[1] = (byte) (n >> 8 & 0xff);
+        b[2] = (byte) (n >> 16 & 0xff);
+        b[3] = (byte) (n >> 24 & 0xff);
+        return b;
+    }
+    /**
+     * byte数组到int的转换(小端)
+     * @param bytes
+     * @return
+     */
+    public static int bytes2IntLittle(byte[] bytes )
+    {
+        int int1=bytes[0]&0xff;
+        int int2=(bytes[1]&0xff)<<8;
+        int int3=(bytes[2]&0xff)<<16;
+        int int4=(bytes[3]&0xff)<<24;
+
+        return int1|int2|int3|int4;
+    }
+    /**
+     * byte数组到int的转换(大端)
+     * @param bytes
+     * @return
+     */
+    public static int bytes2IntBig(byte[] bytes )
+    {
+        int int1=bytes[3]&0xff;
+        int int2=(bytes[2]&0xff)<<8;
+        int int3=(bytes[1]&0xff)<<16;
+        int int4=(bytes[0]&0xff)<<24;
+
+        return int1|int2|int3|int4;
+    }
+    /**
+     * 将short转为高字节在前,低字节在后的byte数组(大端)
+     * @param n short
+     * @return byte[]
+     */
+    public static byte[] shortToByteBig(short n) {
+        byte[] b = new byte[2];
+        b[1] = (byte) (n & 0xff);
+        b[0] = (byte) (n >> 8 & 0xff);
+        return b;
+    }
+
+    /**
+     * 将short转为低字节在前,高字节在后的byte数组(小端)
+     * @param n short
+     * @return byte[]
+     */
+    public static byte[] shortToByteLittle(short n) {
+        byte[] b = new byte[2];
+        b[0] = (byte) (n & 0xff);
+        b[1] = (byte) (n >> 8 & 0xff);
+        return b;
+    }
+    /**
+     *  读取小端byte数组为short
+     * @param b
+     * @return
+     */
+    public static short byteToShortLittle(byte[] b) {
+        return (short) (((b[1] << 8) | b[0] & 0xff));
+    }
+    /**
+     *  读取大端byte数组为short
+     * @param b
+     * @return
+     */
+    public static short byteToShortBig(byte[] b) {
+        return (short) (((b[0] << 8) | b[1] & 0xff));
+    }
+    /**
+     * long类型转byte[] (大端)
+     * @param n
+     * @return
+     */
+    public static byte[] longToBytesBig(long n) {
+        byte[] b = new byte[8];
+        b[7] = (byte) (n & 0xff);
+        b[6] = (byte) (n >> 8  & 0xff);
+        b[5] = (byte) (n >> 16 & 0xff);
+        b[4] = (byte) (n >> 24 & 0xff);
+        b[3] = (byte) (n >> 32 & 0xff);
+        b[2] = (byte) (n >> 40 & 0xff);
+        b[1] = (byte) (n >> 48 & 0xff);
+        b[0] = (byte) (n >> 56 & 0xff);
+        return b;
+    }
+    /**
+     * long类型转byte[] (小端)
+     * @param n
+     * @return
+     */
+    public static byte[] longToBytesLittle(long n) {
+        byte[] b = new byte[8];
+        b[0] = (byte) (n & 0xff);
+        b[1] = (byte) (n >> 8  & 0xff);
+        b[2] = (byte) (n >> 16 & 0xff);
+        b[3] = (byte) (n >> 24 & 0xff);
+        b[4] = (byte) (n >> 32 & 0xff);
+        b[5] = (byte) (n >> 40 & 0xff);
+        b[6] = (byte) (n >> 48 & 0xff);
+        b[7] = (byte) (n >> 56 & 0xff);
+        return b;
+    }
+    /**
+     * byte[]转long类型(小端)
+     * @param array
+     * @return
+     */
+    public static long bytesToLongLittle( byte[] array )
+    {
+        return ((((long) array[ 0] & 0xff) << 0)
+                | (((long) array[ 1] & 0xff) << 8)
+                | (((long) array[ 2] & 0xff) << 16)
+                | (((long) array[ 3] & 0xff) << 24)
+                | (((long) array[ 4] & 0xff) << 32)
+                | (((long) array[ 5] & 0xff) << 40)
+                | (((long) array[ 6] & 0xff) << 48)
+                | (((long) array[ 7] & 0xff) << 56));
+    }
+
+    /**
+     * byte[]转long类型(大端)
+     * @param array
+     * @return
+     */
+    public static long bytesToLongBig( byte[] array )
+    {
+        return ((((long) array[ 0] & 0xff) << 56)
+                | (((long) array[ 1] & 0xff) << 48)
+                | (((long) array[ 2] & 0xff) << 40)
+                | (((long) array[ 3] & 0xff) << 32)
+                | (((long) array[ 4] & 0xff) << 24)
+                | (((long) array[ 5] & 0xff) << 16)
+                | (((long) array[ 6] & 0xff) << 8)
+                | (((long) array[ 7] & 0xff) << 0));
+    }
+
+    /**
+     * 字节数组转化为16进制字符串形式
+     * @param b byte数组
+     * @return  16进制字符串
+     */
+    public static String bytes2HexString(byte[] b) {
+        String ret = "";
+        for (int i = 0; i < b.length; i++) {
+            String hex = Integer.toHexString(b[i] & 0xFF);
+            if (hex.length() == 1) {
+                hex = '0' + hex;
+            }
+            ret += hex;
+        }
+        return ret;
+    }
+
+}

+ 62 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/utils/SpringContextUtil.java

@@ -0,0 +1,62 @@
+package com.gyee.edge.gdnxak.utils;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class SpringContextUtil implements ApplicationContextAware {
+
+    /**
+     * 上下文对象实例
+     */
+    private static ApplicationContext applicationContext;
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+
+    /**
+     * 获取applicationContext
+     *
+     * @return
+     */
+    public static ApplicationContext getApplicationContext() {
+        return applicationContext;
+    }
+
+    /**
+     * 通过name获取 Bean.
+     *
+     * @param name
+     * @return
+     */
+    public static Object getBean(String name) {
+        return getApplicationContext().getBean(name);
+    }
+
+    /**
+     * 通过class获取Bean.
+     *
+     * @param clazz
+     * @param <T>
+     * @return
+     */
+    public static <T> T getBean(Class<T> clazz) {
+        return getApplicationContext().getBean(clazz);
+    }
+
+    /**
+     * 通过name,以及Clazz返回指定的Bean
+     *
+     * @param name
+     * @param clazz
+     * @param <T>
+     * @return
+     */
+    public static <T> T getBean(String name, Class<T> clazz) {
+        return getApplicationContext().getBean(name, clazz);
+    }
+}

+ 261 - 0
gdnxak102/src/main/java/com/gyee/edge/gdnxak/utils/Util.java

@@ -0,0 +1,261 @@
+package com.gyee.edge.gdnxak.utils;
+
+
+import java.util.Calendar;
+import java.util.Date;
+
+/**
+ * 工具类
+ *
+ * @author Administrator 2018-04-07
+ */
+
+public class Util {
+    /**
+     * 字节数组转换成String,指定长度转换长度
+     *
+     * @param arrBytes
+     * @param count    转换长度
+     * @param blank    要不要空格(每个byte字节,最是否用一个“ ”隔开)
+     * @return "" | arrBytes换成的字符串(不存在null)
+     */
+    public static String byteArray2HexString(byte[] arrBytes, int count, boolean blank) {
+        String ret = "";
+        if (arrBytes == null || arrBytes.length < 1)
+            return ret;
+        if (count > arrBytes.length)
+            count = arrBytes.length;
+        StringBuilder builder = new StringBuilder();
+
+        for (int i = 0; i < count; i++) {
+            ret = Integer.toHexString(arrBytes[i] & 0xFF).toUpperCase();
+            if (ret.length() == 1)
+                builder.append("0").append(ret);
+            else
+                builder.append(ret);
+            if (blank)
+                builder.append(" ");
+        }
+
+        return builder.toString();
+
+    }
+
+    /**
+     * 将两个ASCII字符合成一个字节; 如:"EF"--> 0xEF
+     *
+     * @param src0 byte
+     * @param src1 byte
+     * @return byte
+     */
+    public static byte uniteBytes(byte src0, byte src1) {
+        byte _b0 = Byte.decode("0x" + new String(new byte[]{src0})).byteValue();
+        _b0 = (byte) (_b0 << 4);
+        byte _b1 = Byte.decode("0x" + new String(new byte[]{src1})).byteValue();
+        byte ret = (byte) (_b0 ^ _b1);
+        return ret;
+    }
+
+    /**
+     * 将字节数组转换成16进制字符串
+     *
+     * @param array 需要转换的字符串(字节间没有分隔符)
+     * @return 转换完成的字符串
+     */
+    public static String byteArrayToHexString(byte[] array) {
+        return byteArray2HexString(array, Integer.MAX_VALUE, false);
+    }
+
+    /**
+     * 时标CP56Time2a解析
+     *
+     * @param b 时标CP56Time2a(长度为7 的int数组)
+     * @return 解析结果
+     */
+    public static String TimeScale(int b[]) {
+
+        String str = "";
+        int year = b[6] & 0x7F;
+        int month = b[5] & 0x0F;
+        int day = b[4] & 0x1F;
+        int week = (b[4] & 0xE0) / 32;
+        int hour = b[3] & 0x1F;
+        int minute = b[2] & 0x3F;
+        int second = (b[1] << 8) + b[0];
+
+        str += "时标CP56Time2a:" + "20" + year + "-"
+                + String.format("%02d", month) + "-"
+                + String.format("%02d", day) + "," + hour + ":" + minute + ":"
+                + second / 1000 + "." + second % 1000;
+        return str + "\n";
+    }
+
+    /**
+     * 16进制表示的字符串转换为字节数组
+     *
+     * @param s 16进制表示的字符串
+     * @return byte[] 字节数组
+     */
+    public static int[] hexStringToIntArray(String s) {
+        int len = s.length();
+        int[] b = new int[len / 2];
+        for (int i = 0; i < len; i += 2) {
+            // 两位一组,表示一个字节,把这样表示的16进制字符串,还原成一个字节
+            b[i / 2] = (int) ((Character.digit(s.charAt(i), 16) << 4) + Character
+                    .digit(s.charAt(i + 1), 16));
+        }
+        return b;
+    }
+
+    /**
+     * int转换成16进制字符串
+     *
+     * @param b 需要转换的int值
+     * @return 16进制的String
+     */
+    public static String toHexString(int b) {
+        String hex = Integer.toHexString(b & 0xFF);
+        if (hex.length() == 1) {
+            hex = '0' + hex;
+        }
+        return "0x" + hex.toUpperCase();
+    }
+
+    /**
+     * 检验CS校验和
+     *
+     * @param vCS 需要检验的部分
+     * @param i   CS校验和
+     * @return 检验结果(字符串格式)
+     */
+    public static String variableCS(int[] vCS, int i) {
+        String str = "";
+        str += "校验和CS=";
+        str += toHexString(i);
+        int sum = 0;
+        for (int j = 0; j < vCS.length; j++) {
+            sum += vCS[j];
+        }
+        if ((sum % 256) == i) {
+            str += "   校验无误!";
+
+        } else {
+            str += "   经校验,报文有误!";
+        }
+        return str;
+
+    }
+
+    /**
+     * 解析地址域
+     *
+     * @param low  第一个地址
+     * @param high 第二个地址
+     * @return
+     */
+    public static String address(int low, int high) {
+
+        String lowString = String.format("%02X", low);
+        String highString = String.format("%02X", high);
+
+        return highString + lowString + "H" + "\n";
+    }
+
+    public static String getAddressStr(int address) {
+        String addressStr = String.format("%04X", address);
+        return addressStr.substring(2, 4) + addressStr.substring(0, 2);
+    }
+
+    public static String getAddressStr3(int address) {
+        String addressStr = String.format("%06X", address);
+        return addressStr.substring(4, 6) + addressStr.substring(2, 4) + addressStr.substring(0, 2);
+    }
+
+    public static String getInfoStr(int info, int length) {
+        StringBuilder builder = new StringBuilder();
+        String infoFormat = "%0" + 2 * length + "X";
+        String infoStr = String.format(infoFormat, info);
+        for (int i = infoStr.length()/2; i >0; i--) {
+            builder.append(infoStr.substring(2*i-2,2*i));
+        }
+        return builder.toString();
+    }
+
+    public static String date2HStr(Date date) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(date);
+        StringBuilder builder = new StringBuilder();
+        String milliSecond = String.format("%04X", (calendar.get(Calendar.SECOND) * 1000) + calendar.get(Calendar.MILLISECOND));
+        builder.append(milliSecond.substring(2, 4));
+        builder.append(milliSecond.substring(0, 2));
+        builder.append(String.format("%02X", calendar.get(Calendar.MINUTE) & 0x3F));
+        builder.append(String.format("%02X", calendar.get(Calendar.HOUR_OF_DAY) & 0x1F));
+        int week = calendar.get(Calendar.DAY_OF_WEEK);
+        if (week == Calendar.SUNDAY)
+            week = 7;
+        else week--;
+        builder.append(String.format("%02X", (week << 5) + (calendar.get(Calendar.DAY_OF_MONTH) & 0x1F)));
+        builder.append(String.format("%02X", calendar.get(Calendar.MONTH) + 1));
+        builder.append(String.format("%02X", calendar.get(Calendar.YEAR) - 2000));
+        return builder.toString();
+    }
+
+    public static long add2long(int address) {
+        int addressLong = ((address & 0xff00) >> 8) + (address & 0x00ff);
+        return addressLong % 256;
+    }
+
+    public static long CP56Time2Long(Date dateTime) {
+        long sum = 0;
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(dateTime);
+        StringBuilder builder = new StringBuilder();
+        long milliSecond = (calendar.get(Calendar.SECOND) * 1000) + calendar.get(Calendar.MILLISECOND);
+        sum += ((milliSecond & 0xff00) >> 8) % 256;
+        sum += (milliSecond & 0x00ff) % 256;
+        sum += (calendar.get(Calendar.MINUTE) & 0x3F) % 256;
+        sum += (calendar.get(Calendar.HOUR_OF_DAY) & 0x1F) % 256;
+
+        int week = calendar.get(Calendar.DAY_OF_WEEK);
+        if (week == Calendar.SUNDAY)
+            week = 7;
+        else week--;
+        sum += ((week << 5) + (calendar.get(Calendar.DAY_OF_MONTH) & 0x1F)) % 256;
+        sum += (calendar.get(Calendar.MONTH) + 1) % 256;
+        sum += (calendar.get(Calendar.YEAR) - 2000) % 256;
+        return sum;
+    }
+
+    /**
+     * 获取I格式的104报文的控制域
+     */
+    public static String getInformationTransmitFormat(int sendNumber, int receiveNnumber) {
+        sendNumber = sendNumber << 1;
+        receiveNnumber = receiveNnumber << 1;
+        String s = String.format("%04X", sendNumber);
+        String r = String.format("%04X", receiveNnumber);
+
+        return s.substring(2, 4) + s.substring(0, 2) + r.substring(2, 4) + r.substring(0, 2);
+    }
+
+    /**
+     * 获取S格式的104报文的控制域
+     */
+    public static String getNumberedSupervisoryFunction(int receiveNnumber) {
+        receiveNnumber = receiveNnumber << 1;
+        String r = String.format("%04X", receiveNnumber);
+
+        return "01" + "00" + r.substring(2, 4) + r.substring(0, 2);
+    }
+
+    /**
+     * 获取U格式的104报文的控制域
+     */
+    public static String getUnnumberedControlFunction(boolean tester, boolean stopdt, boolean startdt) {
+        int con = 3;
+        con += (tester ? 2 : 1) << 6;
+        con += (stopdt ? 2 : 1) << 4;
+        con += (startdt ? 2 : 1) << 2;
+        return String.format("%02X", con) + "000000";
+    }
+}

+ 20 - 0
gdnxak102/src/main/resources/application.yaml

@@ -0,0 +1,20 @@
+spring:
+  application:
+  name: gdnxak102
+
+#此处配置南瑞102服务的地址
+iec102:
+  iec102Ip: 127.0.0.1 #192.168.168.8
+  iec102Port: 8007 #3456
+  #轮询102服务的时间间隔,默认5秒
+  scanInterval: 10000
+  #心跳监测时间间隔
+  heartbeatInterval: 10000
+  #消息缓存最大数量
+  maxCacheCount: 200
+  #此处配置王博采集桥bridge服务的地址
+  bridgeIp: 127.0.0.1 #21.6.34.1
+  bridgePort: 8007 #31010
+  cdqAddressStart: 7400020
+  dqAddressStart: 7400036
+  cftAddressStart: 7400000

+ 8 - 0
gdnxak102/src/main/resources/banner.txt

@@ -0,0 +1,8 @@
+    _____    ______        __      _   __     __     ____   ____      ______   
+   / ___ \  (_  __ \      /  \    / ) (_ \   / _)   /   /  / __ \    (____  \  
+  / /   \_)   ) ) \ \    / /\ \  / /    \ \_/ /    / /) ) ( (  ) )        ) /  
+ ( (  ____   ( (   ) )   ) ) ) ) ) )     \   /    /_/( (  ( (  ) )   __  / /   
+ ( ( (__  )   ) )  ) )  ( ( ( ( ( (      / _ \        ) ) ( (  ) )  /  \/ / __ 
+  \ \__/ /   / /__/ /   / /  \ \/ /    _/ / \ \_     ( (  ( (__) ) ( () \__/ / 
+   \____/   (______/   (_/    \__/    (__/   \__)    /__\  \____/   \__\____(  
+