123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package Mqtt.service;
- import Mqtt.config.LocalCacheManager;
- import Mqtt.config.MQTTConfig;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import java.nio.charset.StandardCharsets;
- import java.util.List;
- /**
- * 发布端逻辑
- */
- @Component
- @Slf4j
- public class MqttClientManager {
- private final MQTTConfig config;
- private final LocalCacheManager cacheManager;
- private MqttClient client;
- public MqttClientManager(MQTTConfig config, LocalCacheManager cacheManager) {
- this.config = config;
- this.cacheManager = cacheManager;
- }
- @PostConstruct
- public void init() {
- if (!config.isEnablePublisher()) {
- log.info("发布端初始化已禁用(enablePublisher=false)");
- return;
- }
- try {
- client = new MqttClient(config.getHost(), config.getPublisher().getClientId(), new MemoryPersistence());
- MqttConnectOptions options = buildOptions();
- client.connect(options);
- log.info("发布端连接成功:{}", config.getHost());
- List<String> cachedMessages = cacheManager.loadAll();
- for (String message : cachedMessages) {
- publish(config.getPublisher().getDefaultTopic(), message);
- }
- cacheManager.clearAll();
- } catch (MqttException e) {
- log.error("发布端连接失败:{}", e.getMessage(), e);
- }
- }
- public void publish(String topic, String payload) {
- try {
- if (client == null || !client.isConnected()) {
- log.warn("发布端未连接,缓存消息...");
- cacheManager.cache(payload);
- return;
- }
- MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
- message.setQos(1); // 可配置
- client.publish(topic, message);
- log.info("消息已发布到 {}:{}", topic, payload);
- } catch (Exception e) {
- log.error("发布失败,缓存消息", e);
- cacheManager.cache(payload);
- }
- }
- private MqttConnectOptions buildOptions() {
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(config.isCleansession());
- options.setUserName(config.getUsername());
- options.setPassword(config.getPassword().toCharArray());
- options.setKeepAliveInterval(config.getKeepalive());
- options.setConnectionTimeout(config.getConnectionTimeout());
- return options;
- }
- }
|