本项目是一个 基于 Spring Boot + Eclipse Paho MQTT 的设备数据采集和上报系统, 具备完整的 MQTT 数据收发、消息格式封装、断点续传、本地缓存、配置化等能力。 下面是详细梳理与分析: 一、系统主要功能总览 | 功能 | 说明 | | -------------- | ---------------------------------- | | **1. MQTT 发布** | 向指定 topic 发布设备数据(支持 JSON 格式封装) | | **2. MQTT 订阅** | 启动时或手动订阅配置 topic,接收设备端上报的数据 | | **3. 数据封装转发** | 接收到数据后进行 JSON 解析与重组,再转发到管理平台 topic | | **4. 断点续传** | 连接中断时自动缓存消息,恢复后自动补发 | | **5. 接口调用上传** | 提供 REST 接口上传任意 JSON 数据到 MQTT | | **6. 配置自动加载** | 支持通过 application.yml 完全配置化管理 | | **7. 日志追踪** | 使用 SLF4J 打印操作日志,便于调试和运维 | | **8. 本地缓存管理** | 可清理/加载未发送的数据,缓存路径可配置 | 二、核心模块结构说明 1. MQTTConfig 加载 application.yml 配置,包括主机、clientId、topic、qos 等。 子类 SubscribeConfig 专用于订阅 topic 配置。 2. MqttConnect 封装生成 MqttConnectOptions 的逻辑。 包括设置用户名、密码、心跳、超时等参数。 3. MQTTServer 核心服务类:管理发布和订阅端连接。 包含: publishConnect() 发布端连接 subsribeConnect() 订阅端连接 init(topic, qos) 初始化订阅 sendMQTTMessage(topic, data, qos) 发布 JSON 消息 handleIncomingMessage(topic, payload) 处理收到的消息并转发 4. PushCallback MQTT 消息回调处理类,实现 MqttCallback 接收到数据后,检查字段完整性并进行 JSON 拆解、封装,上送管理平台 topic。 5. MqttController 对外暴露 REST 接口: /mqtt/upload:POST 上传 JSON 数据,转发到默认 topic /mqtt/test-publish:GET 发送任意消息 /mqtt/manual-subscribe:GET 手动订阅任意 topic 启动时自动调用配置的订阅 topic(通过 ApplicationRunner) 6. MqttClientManager 辅助管理 MQTT 发布: 缓存失败消息(写入本地 JSON 文件) 断点续传,恢复后自动补发未发送数据 自动重连 MQTT 服务器 7. LocalCacheManager 简化封装本地缓存逻辑 cache():缓存消息到文件 loadAll():读取所有未发送消息 clearAll():清空缓存目录 8. JsonFormatter 使用 Gson 美化 JSON 输出(可选) 9. DataMessage JSON 数据模型类 定义接收/上传数据结构,用于结构化映射 JSON 四、典型使用流程 1、启动应用 自动连接 MQTT 自动订阅配置中的 topic 2、设备上传数据 终端通过 MQTT 向 device/topic 发送 JSON 格式数据 3、系统接收数据 PushCallback 解析数据,封装成平台需要格式 4、系统转发到管理平台 向 management/data/upload topic 发布转发后的数据 5、断网处理 断开时会缓存数据文件,重连后再上送 6、接口测试 手动测试 MQTT 发布 /mqtt/test-publish REST 接口上送数据 /mqtt/upload