王波 3 тижнів тому
коміт
a073495549

+ 75 - 0
Mqtt.md

@@ -0,0 +1,75 @@
+
+本项目是一个 基于 Spring Boot + Eclipse Paho MQTT 的设备数据采集和上报系统,
+具备完整的 MQTT 数据收发、消息格式封装、断点续传、本地缓存、配置化等能力。
+下面是详细梳理与分析:
+
+一、系统主要功能总览
+
+| 功能             | 说明                                 |
+| -------------- | ---------------------------------- |
+| **1. MQTT 发布** | 向指定 topic 发布设备数据(支持 JSON 格式封装)     |
+| **2. MQTT 订阅** | 启动时或手动订阅配置 topic,接收设备端上报的数据        |
+| **3. 数据封装转发**  | 接收到数据后进行 JSON 解析与重组,再转发到管理平台 topic |
+| **4. 断点续传**    | 连接中断时自动缓存消息,恢复后自动补发                |
+| **5. 接口调用上传**  | 提供 REST 接口上传任意 JSON 数据到 MQTT       |
+| **6. 配置自动加载**  | 支持通过 application.yml 完全配置化管理       |
+| **7. 日志追踪**    | 使用 SLF4J 打印操作日志,便于调试和运维            |
+| **8. 本地缓存管理**  | 可清理/加载未发送的数据,缓存路径可配置               |
+
+
+二、核心模块结构说明
+1. MQTTConfig
+   加载 application.yml 配置,包括主机、clientId、topic、qos 等。
+   子类 SubscribeConfig 专用于订阅 topic 配置。
+2. MqttConnect
+   封装生成 MqttConnectOptions 的逻辑。
+   包括设置用户名、密码、心跳、超时等参数。
+3. MQTTServer
+   核心服务类:管理发布和订阅端连接。
+   包含:
+     publishConnect() 发布端连接
+     subsribeConnect() 订阅端连接
+     init(topic, qos) 初始化订阅
+     sendMQTTMessage(topic, data, qos) 发布 JSON 消息
+     handleIncomingMessage(topic, payload) 处理收到的消息并转发
+4. PushCallback
+   MQTT 消息回调处理类,实现 MqttCallback
+   接收到数据后,检查字段完整性并进行 JSON 拆解、封装,上送管理平台 topic。
+5. MqttController
+   对外暴露 REST 接口:
+      /mqtt/upload:POST 上传 JSON 数据,转发到默认 topic
+      /mqtt/test-publish:GET 发送任意消息
+      /mqtt/manual-subscribe:GET 手动订阅任意 topic
+   启动时自动调用配置的订阅 topic(通过 ApplicationRunner)
+6. MqttClientManager
+   辅助管理 MQTT 发布:
+    缓存失败消息(写入本地 JSON 文件)
+    断点续传,恢复后自动补发未发送数据
+   自动重连 MQTT 服务器
+7. LocalCacheManager
+   简化封装本地缓存逻辑
+   cache():缓存消息到文件
+   loadAll():读取所有未发送消息
+   clearAll():清空缓存目录
+8. JsonFormatter
+   使用 Gson 美化 JSON 输出(可选)
+9. DataMessage
+   JSON 数据模型类
+   定义接收/上传数据结构,用于结构化映射 JSON
+
+四、典型使用流程
+
+1、启动应用
+    自动连接 MQTT
+    自动订阅配置中的 topic
+2、设备上传数据
+    终端通过 MQTT 向 device/topic 发送 JSON 格式数据
+3、系统接收数据
+    PushCallback 解析数据,封装成平台需要格式 
+4、系统转发到管理平台
+    向 management/data/upload topic 发布转发后的数据
+5、断网处理
+    断开时会缓存数据文件,重连后再上送
+6、接口测试
+    手动测试 MQTT 发布 /mqtt/test-publish
+    REST 接口上送数据 /mqtt/upload

+ 81 - 0
pom.xml

@@ -0,0 +1,81 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.example</groupId>
+    <artifactId>mqtt-uploader</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <name>Archetype - mqtt-uploader</name>
+    <url>http://maven.apache.org</url>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+            <version>2.7.5</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.5</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.10</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-core</artifactId>
+            <version>5.5.4</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-integration</artifactId>
+            <version>2.7.5</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+            <version>5.5.4</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.15.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.83</version>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.30</version>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+    <properties>
+        <java.version>1.8</java.version>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+</project>

+ 12 - 0
src/main/java/Mqtt/MqttUploaderApplication.java

@@ -0,0 +1,12 @@
+package Mqtt;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class MqttUploaderApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(MqttUploaderApplication.class, args);
+    }
+
+}

+ 28 - 0
src/main/java/Mqtt/config/DataMessage.java

@@ -0,0 +1,28 @@
+package Mqtt.config;
+
+import java.util.List;
+
+/**
+ *  JSON 数据模型
+ */
+public class DataMessage {
+    public List<Device> devices;
+
+    public static class Device {
+        public String deviceId;
+        public List<Service> services;
+    }
+
+    public static class Service {
+        public Data data;
+        public String eventTime;
+        public String serviceId;
+    }
+
+    public static class Data {
+        public String station_no;
+        public long time;
+        public String d_id;
+        public java.util.Map<String, Object> d_data;
+    }
+}

+ 16 - 0
src/main/java/Mqtt/config/JsonFormatter.java

@@ -0,0 +1,16 @@
+package Mqtt.config;
+
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * JSON 格式化工具
+ */
+public class JsonFormatter {
+    private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
+
+    public static String toJson(DataMessage msg) {
+        return gson.toJson(msg);
+    }
+}

+ 80 - 0
src/main/java/Mqtt/config/LocalCacheManager.java

@@ -0,0 +1,80 @@
+package Mqtt.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.*;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * 本地文件缓存工具类,用来存储 MQTT 未能成功发送的消息,并在连接恢复后重新读取和发送。
+ */
+@Component
+public class LocalCacheManager {
+
+    private final Path cacheDir;
+
+    /**
+     * 构造函数,创建缓存目录。
+     *指定缓存目录为 "mqtt-cache"(可在配置文件中外部化)。
+     * 所有缓存的消息都以 JSON 文件的形式保存在这个目录
+     * @param cachePath 缓存目录路径
+     * @throws IOException 创建目录时发生 IO 异常
+     */
+    public LocalCacheManager(@Value("${cache.path:./cache}") String cachePath) throws IOException {
+        this.cacheDir = Paths.get(cachePath);
+        Files.createDirectories(cacheDir);
+    }
+
+    /**
+     * 缓存一条消息。
+     * @param json 要缓存的消息,以 JSON 格式表示
+     */
+    public void cache(String json) {
+        try {
+            String file = "msg-" + System.currentTimeMillis() + ".json";
+            Files.write(cacheDir.resolve(file), json.getBytes(StandardCharsets.UTF_8));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 加载所有缓存的消息。
+     * @return 所有缓存的消息,以 JSON 格式表示
+     */
+    public List<String> loadAll() {
+        try (Stream<Path> files = Files.list(cacheDir)) {
+            return files
+                    .filter(p -> p.toString().endsWith(".json"))
+                    .map(p -> {
+                        try {
+                            return new String(Files.readAllBytes(p), StandardCharsets.UTF_8);
+                        } catch (IOException e) {
+                            return null;
+                        }
+                    })
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            e.printStackTrace();
+            return Collections.emptyList();
+        }
+    }
+
+    /**
+     * 清空所有缓存的消息。
+     */
+    public void clearAll() {
+        try (Stream<Path> files = Files.list(cacheDir)) {
+            files.forEach(p -> p.toFile().delete());
+        } catch (IOException ignored) {}
+    }
+}
+

+ 37 - 0
src/main/java/Mqtt/config/MQTTConfig.java

@@ -0,0 +1,37 @@
+package Mqtt.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@Component
+@ConfigurationProperties(prefix = "mqtt")
+@Data
+public class MQTTConfig {
+
+    private String host;
+    private String username;
+    private String password;
+    private boolean cleansession;
+    private int keepalive;
+    private int connectionTimeout;
+    private boolean enablePublisher;
+
+    private Publisher publisher = new Publisher();
+    private Subscriber subscriber = new Subscriber();
+
+    @Data
+    public static class Publisher {
+        private String clientId;
+        private String defaultTopic;
+    }
+
+    @Data
+    public static class Subscriber {
+        private String clientId;
+        private String defaultTopic;
+    }
+}
+
+
+

+ 40 - 0
src/main/java/Mqtt/config/MqttConnect.java

@@ -0,0 +1,40 @@
+package Mqtt.config;
+
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * 创建MqttConnectOptions连接对象
+ */
+@Component
+public class MqttConnect {
+
+    @Autowired
+    private MQTTConfig config;
+
+    public MqttConnect(MQTTConfig config) {
+        this.config = config;
+    }
+
+    public MqttConnectOptions getOptions() {
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setCleanSession(config.isCleansession());
+        options.setUserName(config.getUsername());
+        options.setPassword(config.getPassword().toCharArray());
+        options.setConnectionTimeout(config.getConnectionTimeout());
+        //设置心跳
+        options.setKeepAliveInterval(config.getKeepalive());
+        return options;
+    }
+
+    public MqttConnectOptions getOptions(MqttConnectOptions options) {
+
+        options.setCleanSession(options.isCleanSession());
+        options.setUserName(options.getUserName());
+        options.setPassword(options.getPassword());
+        options.setConnectionTimeout(options.getConnectionTimeout());
+        options.setKeepAliveInterval(options.getKeepAliveInterval());
+        return options;
+    }
+}

+ 52 - 0
src/main/java/Mqtt/config/PushCallback.java

@@ -0,0 +1,52 @@
+package Mqtt.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+import java.util.Base64;
+
+@Slf4j
+public class PushCallback implements MqttCallback {
+
+    private final LocalCacheManager cacheManager;
+
+    public PushCallback(LocalCacheManager cacheManager) {
+        this.cacheManager = cacheManager;
+    }
+
+    @Override
+    public void connectionLost(Throwable cause) {
+        log.warn("MQTT连接丢失:{}", cause.getMessage());
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) {
+        byte[] payloadBytes = message.getPayload(); // 原始二进制数据
+
+        // 打印原始字节数据(十六进制)
+        log.info("收到订阅消息,主题:{},原始字节:{}", topic, bytesToHex(payloadBytes));
+
+        // 可选:编码为 Base64 以供缓存或上传
+        String base64Payload = Base64.getEncoder().encodeToString(payloadBytes);
+        log.info("Base64 编码后:{}", base64Payload);
+
+        // 缓存 base64 表达的数据(可替换为你自己的处理逻辑)
+        cacheManager.cache(base64Payload);
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        log.debug("消息发送完成");
+    }
+
+    // 辅助方法:转 16 进制字符串用于日志打印
+    private String bytesToHex(byte[] bytes) {
+        StringBuilder sb = new StringBuilder();
+        for (byte b : bytes) {
+            sb.append(String.format("%02X ", b));
+        }
+        return sb.toString().trim();
+    }
+}

+ 79 - 0
src/main/java/Mqtt/controller/MqttController.java

@@ -0,0 +1,79 @@
+package Mqtt.controller;
+
+import Mqtt.config.MQTTConfig;
+import Mqtt.service.MQTTServer;
+import com.google.gson.Gson;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * MQTT 控制器,负责消息发布与订阅接口
+ */
+@RestController
+@RequestMapping("/mqtt")
+public class MqttController implements ApplicationRunner {
+
+    @Autowired
+    private MQTTServer mqttServer;
+
+    @Autowired
+    private Gson gson;
+
+    @Autowired
+    private MQTTConfig mqttConfig;
+
+    /**
+     * 应用启动后自动订阅配置中的主题
+     */
+    @Override
+    public void run(ApplicationArguments args) {
+        CompletableFuture.runAsync(() -> {
+            String topic = mqttConfig.getSubscriber().getDefaultTopic();
+            mqttServer.init(topic, 1);  // 你也可以从 config.getSubscriber().getQos() 获取
+            System.out.println("自动订阅:" + topic);
+        });
+    }
+
+    /**
+     * 接收终端数据并上送(POST JSON)
+     */
+    @PostMapping("/upload")
+    public ResponseEntity<String> upload(@RequestBody Map<String, Object> raw) {
+        String json = gson.toJson(raw);
+        mqttServer.sendMQTTMessage(
+                mqttConfig.getPublisher().getDefaultTopic(),
+                json,
+                1  // 默认 QoS 也可以从 config 中读取
+        );
+        return ResponseEntity.ok("数据已上送");
+    }
+
+    /**
+     * 测试接口:发送消息到任意 Topic
+     */
+    @GetMapping("/test-publish")
+    public ResponseEntity<String> testPublish(
+            @RequestParam String topic,
+            @RequestParam String msg,
+            @RequestParam(defaultValue = "1") int qos) {
+        mqttServer.sendMQTTMessage(topic, msg, qos);
+        return ResponseEntity.ok("已发送到主题: '" + topic + "',内容: " + msg + ",QoS: " + qos);
+    }
+
+    /**
+     * 手动订阅指定主题
+     */
+    @GetMapping("/manual-subscribe")
+    public ResponseEntity<String> manualSubscribe(
+            @RequestParam String topic,
+            @RequestParam(defaultValue = "1") int qos) {
+        mqttServer.init(topic, qos);
+        return ResponseEntity.ok("已手动订阅主题: '" + topic + "',QoS: " + qos);
+    }
+}

+ 34 - 0
src/main/java/Mqtt/service/MQTTServer.java

@@ -0,0 +1,34 @@
+package Mqtt.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+ * 统一协调中间层,由 MqttController 调用
+ */
+@Service
+@Slf4j
+public class MQTTServer {
+
+    private final MqttClientManager publisher;
+    private final MqttSubscriberManager subscriber;
+
+    public MQTTServer(MqttClientManager publisher, MqttSubscriberManager subscriber) {
+        this.publisher = publisher;
+        this.subscriber = subscriber;
+    }
+
+    /**
+     * 发送 MQTT 消息
+     */
+    public void sendMQTTMessage(String topic, String payload, int qos) {
+        publisher.publish(topic, payload);
+    }
+
+    /**
+     * 初始化订阅(默认或自定义)
+     */
+    public void init(String topic, int qos) {
+        subscriber.subscribe(topic, qos);
+    }
+}

+ 84 - 0
src/main/java/Mqtt/service/MqttClientManager.java

@@ -0,0 +1,84 @@
+package Mqtt.service;
+
+import Mqtt.config.LocalCacheManager;
+import Mqtt.config.MQTTConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * 发布端逻辑
+ */
+@Component
+@Slf4j
+public class MqttClientManager {
+
+    private final MQTTConfig config;
+    private final LocalCacheManager cacheManager;
+    private MqttClient client;
+
+    public MqttClientManager(MQTTConfig config, LocalCacheManager cacheManager) {
+        this.config = config;
+        this.cacheManager = cacheManager;
+    }
+
+    @PostConstruct
+    public void init() {
+        if (!config.isEnablePublisher()) {
+            log.info("发布端初始化已禁用(enablePublisher=false)");
+            return;
+        }
+
+        try {
+            client = new MqttClient(config.getHost(), config.getPublisher().getClientId(), new MemoryPersistence());
+
+            MqttConnectOptions options = buildOptions();
+            client.connect(options);
+            log.info("发布端连接成功:{}", config.getHost());
+
+            List<String> cachedMessages = cacheManager.loadAll();
+            for (String message : cachedMessages) {
+                publish(config.getPublisher().getDefaultTopic(), message);
+            }
+            cacheManager.clearAll();
+
+        } catch (MqttException e) {
+            log.error("发布端连接失败:{}", e.getMessage(), e);
+        }
+    }
+
+
+    public void publish(String topic, String payload) {
+        try {
+            if (client == null || !client.isConnected()) {
+                log.warn("发布端未连接,缓存消息...");
+                cacheManager.cache(payload);
+                return;
+            }
+            MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
+            message.setQos(1); // 可配置
+            client.publish(topic, message);
+            log.info("消息已发布到 {}:{}", topic, payload);
+        } catch (Exception e) {
+            log.error("发布失败,缓存消息", e);
+            cacheManager.cache(payload);
+        }
+    }
+
+    private MqttConnectOptions buildOptions() {
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setCleanSession(config.isCleansession());
+        options.setUserName(config.getUsername());
+        options.setPassword(config.getPassword().toCharArray());
+        options.setKeepAliveInterval(config.getKeepalive());
+        options.setConnectionTimeout(config.getConnectionTimeout());
+        return options;
+    }
+}
+
+

+ 73 - 0
src/main/java/Mqtt/service/MqttSubscriberManager.java

@@ -0,0 +1,73 @@
+package Mqtt.service;
+
+import Mqtt.config.LocalCacheManager;
+import Mqtt.config.MQTTConfig;
+import Mqtt.config.PushCallback;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.stereotype.Component;
+
+/**
+ * 订阅端逻辑
+ */
+@Component
+@Slf4j
+public class MqttSubscriberManager {
+
+    private final MQTTConfig config;
+    private final LocalCacheManager cacheManager;
+    private MqttClient client;
+
+    public MqttSubscriberManager(MQTTConfig config, LocalCacheManager cacheManager) {
+        this.config = config;
+        this.cacheManager = cacheManager;
+        connect();
+    }
+
+    private void connect() {
+        try {
+            String topic = config.getSubscriber().getDefaultTopic();
+            if (topic == null) {
+                throw new IllegalArgumentException("订阅主题 defaultTopic 不能为空,请检查配置文件!");
+            }
+
+
+            client = new MqttClient(config.getHost(), config.getSubscriber().getClientId(), new MemoryPersistence());
+            client.setCallback(new PushCallback(cacheManager));
+
+            MqttConnectOptions options = new MqttConnectOptions();
+            options.setCleanSession(config.isCleansession());
+            options.setUserName(config.getUsername());
+            options.setPassword(config.getPassword().toCharArray());
+            options.setKeepAliveInterval(config.getKeepalive());
+            options.setConnectionTimeout(config.getConnectionTimeout());
+
+            client.connect(options);
+            client.subscribe(config.getSubscriber().getDefaultTopic());
+
+            log.info("订阅端连接成功并订阅:{}", config.getSubscriber().getDefaultTopic());
+
+        } catch (MqttException e) {
+            log.error("订阅端连接失败:{}", e.getMessage(), e);
+        }
+    }
+
+
+    public void subscribe(String topic, int qos) {
+        try {
+            if (client != null && client.isConnected()) {
+                client.subscribe(topic, qos);
+                log.info("手动订阅成功:{} (QoS: {})", topic, qos);
+            } else {
+                log.warn("订阅端尚未连接,无法订阅:{}", topic);
+            }
+        } catch (MqttException e) {
+            log.error("订阅主题失败:{}", topic, e);
+        }
+    }
+
+}
+

+ 35 - 0
src/main/resources/application.yaml

@@ -0,0 +1,35 @@
+server:
+  port: 8080
+
+# MQTT 连接的基本配置
+mqtt:
+#  host: tcp://localhost:1883             # MQTT 服务器地址(含协议、主机名、端口)
+  host: tcp://10.220.1.5:2883
+  username: admin@scada.com              # 连接用的用户名
+  password: Scada135}+?                  # 连接用的密码
+  cleansession: true                     # 是否清除会话(true 表示每次重新连接时不保留旧会话)
+  keepalive: 60                          # 保活时间(秒),服务器多久没收到消息断开连接
+  connection-timeout: 10                 # 连接超时时间(秒)
+  enable-publisher: false
+  # 发布端配置
+  publisher:
+    clientid: mqtt-publisher-client      # 发布端客户端ID(需唯一)
+    default-topic: pub/topic             # 默认发布的主题(可在代码中覆盖)
+    qos: 0                               # 消息服务质量等级(0=至多一次,1=至少一次,2=只有一次)
+
+  # 订阅端配置
+  subscriber:
+#    clientid: mqtt-subscriber-client     # 订阅端客户端ID(需唯一,与发布端不能重复)
+    clientid: HUIANTOGUANGYAO     # 订阅端客户端ID(需唯一,与发布端不能重复)
+    default-topic: scada/public/HUIANTOGUANGYAO/tag_values                   # 默认订阅的主题
+
+    qos: 0
+# 本地缓存配置
+cache:
+  path: ./cache                          # 用于断点续传的本地缓存目录路径(相对或绝对路径均可)
+
+
+
+
+
+