Browse Source

gateway基础

songwenbin 2 years ago
parent
commit
43efc77637

+ 43 - 0
gateway/src/main/java/com/gyee/edge/gateway/client/ClientFactory.java

@@ -0,0 +1,43 @@
+package com.gyee.edge.gateway.client;
+
+import com.gyee.edge.gateway.config.cache.CacheService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * 通过Endpoint,创建Client对象,并将Client对象放入ClientManager
+ */
+@Component
+public class ClientFactory {
+
+    @Autowired
+    CacheService cacheService;
+
+    @Autowired
+    ClientManager clientManager;
+
+
+    public void CreateClients() {
+        List<EndPoint> endPoints = cacheService.getEndPointList();
+        for(EndPoint endPoint : endPoints) {
+            if (endPoint.getDirection() == 0) {
+                IClient iClient = createClient(endPoint);
+                clientManager.addClient(iClient);
+            }
+        }
+
+    }
+
+    public IClient createClient(EndPoint endPoint) {
+        //todo: 通过endpoint生成不同的IClient
+        return new IEC102TcpClient();
+    }
+
+
+
+
+
+
+}

+ 33 - 0
gateway/src/main/java/com/gyee/edge/gateway/client/ClientManager.java

@@ -0,0 +1,33 @@
+package com.gyee.edge.gateway.client;
+
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * 客户端管理,状态检查等
+ * 批量的客户端管理任务,比如刷新连接状态,检查客户端连接是否正常,发送心跳信号,批量连接,断开,重连等
+ *
+ */
+@Component
+public class ClientManager {
+
+    private List<IClient> clientList;
+
+    public List<IClient> getClientList() {
+        return clientList;
+    }
+
+    public void addClient(IClient iClient) {
+        clientList.add(iClient);
+    }
+
+    public void removeClient(IClient iClient) {
+        clientList.remove(iClient);
+    }
+
+
+
+
+
+}

+ 25 - 0
gateway/src/main/java/com/gyee/edge/gateway/client/EndPoint.java

@@ -0,0 +1,25 @@
+package com.gyee.edge.gateway.client;
+
+import lombok.Data;
+
+@Data
+public class EndPoint {
+    private int id;
+    private String connectionType;
+    private int direction;
+    private String protocol;
+    private String host;
+    private int port;
+    private String serialPort;
+    private int baudRate;
+    private int startBit;
+    private int stopBit;
+    private int checkBit;
+    private String userName;
+    private String password;
+    private String token;
+    private String connectionString;
+    private String path;
+    private boolean enabled;
+
+}

+ 26 - 0
gateway/src/main/java/com/gyee/edge/gateway/client/IClient.java

@@ -0,0 +1,26 @@
+package com.gyee.edge.gateway.client;
+
+import com.gyee.edge.gateway.message.GYMessage;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+
+public interface IClient {
+
+    EndPoint getEndPoint();
+
+    Future<Integer> connect();
+
+    Future<Integer> reconnect();
+
+    void disconnect();
+
+    boolean isConnected();
+
+    EventLoopGroup getEventLoop();
+
+    Future<Integer> sendMessage(GYMessage gyMessage);
+
+    Future<Integer> send(Byte[] bytes);
+
+
+}

+ 39 - 0
gateway/src/main/java/com/gyee/edge/gateway/client/IEC102RxtxClientHandler.java

@@ -0,0 +1,39 @@
+package com.gyee.edge.gateway.client;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+/**
+ *
+ */
+public class IEC102RxtxClientHandler extends SimpleChannelInboundHandler<String> {
+
+    private ChannelHandlerContext ctx;
+    public ChannelHandlerContext getCtx() {
+        return ctx;
+    }
+    public void setCtx(ChannelHandlerContext ctx) {
+        this.ctx = ctx;
+    }
+
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        ctx.writeAndFlush("success\n");
+        this.ctx = ctx;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
+        System.out.println("接收到:"+msg);
+        //ctx.writeAndFlush("已收到\n");
+        //ctx.close();
+    }
+
+    public void write(String msg){
+        if(ctx!=null){
+            ctx.writeAndFlush(msg);
+        }
+    }
+
+}

+ 141 - 0
gateway/src/main/java/com/gyee/edge/gateway/client/IEC102SerialClient.java

@@ -0,0 +1,141 @@
+package com.gyee.edge.gateway.client;
+
+import com.gyee.edge.gateway.message.GYMessage;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.oio.OioEventLoopGroup;
+import io.netty.channel.rxtx.RxtxChannel;
+import io.netty.channel.rxtx.RxtxChannelConfig;
+import io.netty.channel.rxtx.RxtxDeviceAddress;
+import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.util.concurrent.Future;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+
+public class IEC102SerialClient implements IClient {
+
+    private EndPoint endPoint;
+
+    private IEC102RxtxClientHandler defaultHandler;
+
+    private EventLoopGroup eventLoop;
+
+    private volatile Channel channel;
+
+    private volatile boolean disconnected = false;
+    private volatile boolean reconnect = false;
+
+
+    private int status;
+
+    public IEC102SerialClient(EndPoint endPoint1) {
+        this.endPoint = endPoint1;
+    }
+
+    public EndPoint getEndPoint() {
+        return endPoint;
+    }
+
+    @Override
+    public Future<Integer> connect() {
+        return null;
+    }
+
+    @Override
+    public Future<Integer> reconnect() {
+        return null;
+    }
+
+    @Override
+    public void disconnect() {
+
+    }
+
+    @Override
+    public boolean isConnected() {
+        return false;
+    }
+
+    @Override
+    public EventLoopGroup getEventLoop() {
+        return null;
+    }
+
+    @Override
+    public Future<Integer> sendMessage(GYMessage gyMessage) {
+        return null;
+    }
+
+    @Override
+    public Future<Integer> send(Byte[] bytes) {
+        return null;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void createRxtx(IEC102RxtxClientHandler handler, String portName) throws Exception {
+        System.out.println(portName);
+        //串口使用阻塞io
+        EventLoopGroup group = new OioEventLoopGroup();
+        try {
+            Bootstrap bootstrap  = new Bootstrap();
+            bootstrap.group(group)
+                    .channelFactory(() -> {
+                        RxtxChannel rxtxChannel = new RxtxChannel();
+                        rxtxChannel.config()
+                                .setBaudrate(9600)
+                                .setDatabits(RxtxChannelConfig.Databits.DATABITS_8)
+                                .setParitybit(RxtxChannelConfig.Paritybit.NONE)
+                                .setStopbits(RxtxChannelConfig.Stopbits.STOPBITS_1);
+                        return rxtxChannel ;
+                    })
+                    .handler(new ChannelInitializer<RxtxChannel>() {
+                        @Override
+                        protected void initChannel(RxtxChannel rxtxChannel) {
+                            rxtxChannel.pipeline().addLast(
+                                    new LineBasedFrameDecoder(60000),
+                                    new StringEncoder(StandardCharsets.UTF_8),
+                                    new StringDecoder(StandardCharsets.UTF_8),
+                                    handler
+                            );
+                        }
+                    });
+
+            ChannelFuture f = bootstrap.connect(new RxtxDeviceAddress(portName)).sync();
+            f.channel().closeFuture().sync();
+        } finally {
+            group.shutdownGracefully();
+        }
+    }
+
+    public void start() {
+
+    }
+
+    public void start(IEC102RxtxClientHandler handler, String portName){
+        CompletableFuture.runAsync(()->{
+            try {
+                //阻塞的函数
+                createRxtx(handler,portName);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }, Executors.newSingleThreadExecutor());//不传默认使用ForkJoinPool,都是守护线程
+    }
+
+    public void stop() {
+
+    }
+
+    public void restart() {
+
+    }
+
+}

+ 50 - 0
gateway/src/main/java/com/gyee/edge/gateway/client/IEC102TcpClient.java

@@ -0,0 +1,50 @@
+package com.gyee.edge.gateway.client;
+
+
+import com.gyee.edge.gateway.message.GYMessage;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+
+public class IEC102TcpClient implements IClient {
+
+
+    @Override
+    public EndPoint getEndPoint() {
+        return null;
+    }
+
+    @Override
+    public Future<Integer> connect() {
+        return null;
+    }
+
+    @Override
+    public Future<Integer> reconnect() {
+        return null;
+    }
+
+    @Override
+    public void disconnect() {
+
+    }
+
+    @Override
+    public boolean isConnected() {
+        return false;
+    }
+
+    @Override
+    public EventLoopGroup getEventLoop() {
+        return null;
+    }
+
+    @Override
+    public Future<Integer> sendMessage(GYMessage gyMessage) {
+        return null;
+    }
+
+    @Override
+    public Future<Integer> send(Byte[] bytes) {
+        return null;
+    }
+}

+ 38 - 0
gateway/src/main/java/com/gyee/edge/gateway/client/IEC102TcpMaster.java

@@ -0,0 +1,38 @@
+package com.gyee.edge.gateway.client;
+
+
+public class IEC102TcpMaster implements IClient {
+
+    private EndPoint endPoint;
+
+    private int status;
+
+    public IEC102TcpMaster(EndPoint endPoint1) {
+        this.endPoint = endPoint1;
+    }
+
+    public EndPoint getEndPoint() {
+        return endPoint;
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+
+
+
+    public void start() {
+
+    }
+
+
+    public void stop() {
+
+    }
+
+    public void restart() {
+
+    }
+
+}

+ 28 - 0
gateway/src/main/java/com/gyee/edge/gateway/config/cache/CacheService.java

@@ -0,0 +1,28 @@
+package com.gyee.edge.gateway.config.cache;
+
+import com.gyee.edge.gateway.client.EndPoint;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class CacheService {
+
+    private List<EndPoint> endPointList;
+
+    public List<EndPoint> getEndPointList() {
+        if (endPointList == null) {
+            endPointList = loadEndPoints();
+        }
+
+        return endPointList;
+    }
+
+    private List<EndPoint> loadEndPoints() {
+        //todo: 从数据库中加载endpoint
+        return new ArrayList<>();
+    }
+
+
+}