123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package com.gyee.dataadapter.config;
- import com.gyee.dataadapter.cache.MqttCache;
- import com.gyee.dataadapter.entity.PointInfo;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.time.LocalDateTime;
- import java.util.*;
- /**
- * 主要用来接收和处理订阅主题的消息
- */
- public class PushCallback implements MqttCallback {
- private static final Logger LOGGER = LoggerFactory.getLogger(PushCallback.class);
- private MqttServer mqttServer;
- public PushCallback(MqttServer mqttServer) {
- this.mqttServer = mqttServer;
- }
- public void connectionLost(Throwable cause) {
- // 连接丢失后,一般在这里面进行重连
- LOGGER.info("---------------------连接断开,可以做重连");
- mqttServer.subsribeConnect();
- while (true) {
- try {
- //如果没有发生异常说明连接成功,如果发生异常,则死循环
- Thread.sleep(1000);
- break;
- } catch (Exception e) {
- continue;
- }
- }
- }
- /**
- * 发送消息,消息到达后处理方法
- *
- * @param token
- */
- public void deliveryComplete(IMqttDeliveryToken token) {
- System.out.println("deliveryComplete---------" + token.isComplete());
- }
- /**
- * 接收所订阅的主题的消息并处理
- *
- * @param topic
- * @param message
- */
- public void messageArrived2(String topic, MqttMessage message) throws Exception {
- // subscribe后得到的消息会执行到这里面
- String result = new String(message.getPayload(), "UTF-8");
- System.out.println("接收消息主题 : " + topic);
- System.out.println("接收消息Qos : " + message.getQos());
- System.out.println("接收消息内容 : " + result);
- System.out.println(message.getPayload().toString());
- //这里可以针对收到的消息做处理,比如持久化
- }
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- DataConverterManager dm = new DataConverterManager(message.getPayload());
- System.out.print(LocalDateTime.now().getMinute()+",");
- List<PointInfo> pis = dm.getPintInfos();
- for (PointInfo pi :pis) {
- if (!MqttCache.subData.containsKey(pi.getPath())) {
- MqttCache.subData.put(pi.getPath(),pi);
- } else {
- MqttCache.subData.replace(pi.getPath(),pi);
- }
- }
- }
- }
|