package Mqtt.service; import Mqtt.config.LocalCacheManager; import Mqtt.config.MQTTConfig; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.nio.charset.StandardCharsets; import java.util.List; /** * 发布端逻辑 */ @Component @Slf4j public class MqttClientManager { private final MQTTConfig config; private final LocalCacheManager cacheManager; private MqttClient client; public MqttClientManager(MQTTConfig config, LocalCacheManager cacheManager) { this.config = config; this.cacheManager = cacheManager; } @PostConstruct public void init() { if (!config.isEnablePublisher()) { log.info("发布端初始化已禁用(enablePublisher=false)"); return; } try { client = new MqttClient(config.getHost(), config.getPublisher().getClientId(), new MemoryPersistence()); MqttConnectOptions options = buildOptions(); client.connect(options); log.info("发布端连接成功:{}", config.getHost()); List cachedMessages = cacheManager.loadAll(); for (String message : cachedMessages) { publish(config.getPublisher().getDefaultTopic(), message); } cacheManager.clearAll(); } catch (MqttException e) { log.error("发布端连接失败:{}", e.getMessage(), e); } } public void publish(String topic, String payload) { try { if (client == null || !client.isConnected()) { log.warn("发布端未连接,缓存消息..."); cacheManager.cache(payload); return; } MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); message.setQos(1); // 可配置 client.publish(topic, message); log.info("消息已发布到 {}:{}", topic, payload); } catch (Exception e) { log.error("发布失败,缓存消息", e); cacheManager.cache(payload); } } private MqttConnectOptions buildOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(config.isCleansession()); options.setUserName(config.getUsername()); options.setPassword(config.getPassword().toCharArray()); options.setKeepAliveInterval(config.getKeepalive()); options.setConnectionTimeout(config.getConnectionTimeout()); return options; } }