本项目是一个 基于 Spring Boot + Eclipse Paho MQTT 的设备数据采集和上报系统,
具备完整的 MQTT 数据收发、消息格式封装、断点续传、本地缓存、配置化等能力。
下面是详细梳理与分析:
一、系统主要功能总览
功能 |
说明 |
1. MQTT 发布 |
向指定 topic 发布设备数据(支持 JSON 格式封装) |
2. MQTT 订阅 |
启动时或手动订阅配置 topic,接收设备端上报的数据 |
3. 数据封装转发 |
接收到数据后进行 JSON 解析与重组,再转发到管理平台 topic |
4. 断点续传 |
连接中断时自动缓存消息,恢复后自动补发 |
5. 接口调用上传 |
提供 REST 接口上传任意 JSON 数据到 MQTT |
6. 配置自动加载 |
支持通过 application.yml 完全配置化管理 |
7. 日志追踪 |
使用 SLF4J 打印操作日志,便于调试和运维 |
8. 本地缓存管理 |
可清理/加载未发送的数据,缓存路径可配置 |
二、核心模块结构说明
- MQTTConfig
加载 application.yml 配置,包括主机、clientId、topic、qos 等。
子类 SubscribeConfig 专用于订阅 topic 配置。
- MqttConnect
封装生成 MqttConnectOptions 的逻辑。
包括设置用户名、密码、心跳、超时等参数。
- MQTTServer
核心服务类:管理发布和订阅端连接。
包含:
publishConnect() 发布端连接
subsribeConnect() 订阅端连接
init(topic, qos) 初始化订阅
sendMQTTMessage(topic, data, qos) 发布 JSON 消息
handleIncomingMessage(topic, payload) 处理收到的消息并转发
- PushCallback
MQTT 消息回调处理类,实现 MqttCallback
接收到数据后,检查字段完整性并进行 JSON 拆解、封装,上送管理平台 topic。
- MqttController
对外暴露 REST 接口:
/mqtt/upload:POST 上传 JSON 数据,转发到默认 topic
/mqtt/test-publish:GET 发送任意消息
/mqtt/manual-subscribe:GET 手动订阅任意 topic
启动时自动调用配置的订阅 topic(通过 ApplicationRunner)
- MqttClientManager
辅助管理 MQTT 发布:
缓存失败消息(写入本地 JSON 文件)
断点续传,恢复后自动补发未发送数据
自动重连 MQTT 服务器
- LocalCacheManager
简化封装本地缓存逻辑
cache():缓存消息到文件
loadAll():读取所有未发送消息
clearAll():清空缓存目录
- JsonFormatter
使用 Gson 美化 JSON 输出(可选)
- DataMessage
JSON 数据模型类
定义接收/上传数据结构,用于结构化映射 JSON
四、典型使用流程
1、启动应用
自动连接 MQTT
自动订阅配置中的 topic
2、设备上传数据
终端通过 MQTT 向 device/topic 发送 JSON 格式数据
3、系统接收数据
PushCallback 解析数据,封装成平台需要格式
4、系统转发到管理平台
向 management/data/upload topic 发布转发后的数据
5、断网处理
断开时会缓存数据文件,重连后再上送
6、接口测试
手动测试 MQTT 发布 /mqtt/test-publish
REST 接口上送数据 /mqtt/upload