MqttClientManager.java 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package Mqtt.service;
  2. import Mqtt.config.LocalCacheManager;
  3. import Mqtt.config.MQTTConfig;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.eclipse.paho.client.mqttv3.*;
  6. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.PostConstruct;
  9. import java.nio.charset.StandardCharsets;
  10. import java.util.List;
  11. /**
  12. * 发布端逻辑
  13. */
  14. @Component
  15. @Slf4j
  16. public class MqttClientManager {
  17. private final MQTTConfig config;
  18. private final LocalCacheManager cacheManager;
  19. private MqttClient client;
  20. public MqttClientManager(MQTTConfig config, LocalCacheManager cacheManager) {
  21. this.config = config;
  22. this.cacheManager = cacheManager;
  23. }
  24. @PostConstruct
  25. public void init() {
  26. if (!config.isEnablePublisher()) {
  27. log.info("发布端初始化已禁用(enablePublisher=false)");
  28. return;
  29. }
  30. try {
  31. client = new MqttClient(config.getHost(), config.getPublisher().getClientId(), new MemoryPersistence());
  32. MqttConnectOptions options = buildOptions();
  33. client.connect(options);
  34. log.info("发布端连接成功:{}", config.getHost());
  35. List<String> cachedMessages = cacheManager.loadAll();
  36. for (String message : cachedMessages) {
  37. publish(config.getPublisher().getDefaultTopic(), message);
  38. }
  39. cacheManager.clearAll();
  40. } catch (MqttException e) {
  41. log.error("发布端连接失败:{}", e.getMessage(), e);
  42. }
  43. }
  44. public void publish(String topic, String payload) {
  45. try {
  46. if (client == null || !client.isConnected()) {
  47. log.warn("发布端未连接,缓存消息...");
  48. cacheManager.cache(payload);
  49. return;
  50. }
  51. MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
  52. message.setQos(1); // 可配置
  53. client.publish(topic, message);
  54. log.info("消息已发布到 {}:{}", topic, payload);
  55. } catch (Exception e) {
  56. log.error("发布失败,缓存消息", e);
  57. cacheManager.cache(payload);
  58. }
  59. }
  60. private MqttConnectOptions buildOptions() {
  61. MqttConnectOptions options = new MqttConnectOptions();
  62. options.setCleanSession(config.isCleansession());
  63. options.setUserName(config.getUsername());
  64. options.setPassword(config.getPassword().toCharArray());
  65. options.setKeepAliveInterval(config.getKeepalive());
  66. options.setConnectionTimeout(config.getConnectionTimeout());
  67. return options;
  68. }
  69. }