PushCallback.java 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package com.gyee.dataadapter.config;
  2. import com.gyee.dataadapter.cache.MqttCache;
  3. import com.gyee.dataadapter.entity.PointInfo;
  4. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  5. import org.eclipse.paho.client.mqttv3.MqttCallback;
  6. import org.eclipse.paho.client.mqttv3.MqttMessage;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.time.LocalDateTime;
  10. import java.util.*;
  11. /**
  12. * 主要用来接收和处理订阅主题的消息
  13. */
  14. public class PushCallback implements MqttCallback {
  15. private static final Logger LOGGER = LoggerFactory.getLogger(PushCallback.class);
  16. private MqttServer mqttServer;
  17. public PushCallback(MqttServer mqttServer) {
  18. this.mqttServer = mqttServer;
  19. }
  20. public void connectionLost(Throwable cause) {
  21. // 连接丢失后,一般在这里面进行重连
  22. LOGGER.info("---------------------连接断开,可以做重连");
  23. mqttServer.subsribeConnect();
  24. while (true) {
  25. try {
  26. //如果没有发生异常说明连接成功,如果发生异常,则死循环
  27. Thread.sleep(1000);
  28. break;
  29. } catch (Exception e) {
  30. continue;
  31. }
  32. }
  33. }
  34. /**
  35. * 发送消息,消息到达后处理方法
  36. *
  37. * @param token
  38. */
  39. public void deliveryComplete(IMqttDeliveryToken token) {
  40. System.out.println("deliveryComplete---------" + token.isComplete());
  41. }
  42. /**
  43. * 接收所订阅的主题的消息并处理
  44. *
  45. * @param topic
  46. * @param message
  47. */
  48. public void messageArrived2(String topic, MqttMessage message) throws Exception {
  49. // subscribe后得到的消息会执行到这里面
  50. String result = new String(message.getPayload(), "UTF-8");
  51. System.out.println("接收消息主题 : " + topic);
  52. System.out.println("接收消息Qos : " + message.getQos());
  53. System.out.println("接收消息内容 : " + result);
  54. System.out.println(message.getPayload().toString());
  55. //这里可以针对收到的消息做处理,比如持久化
  56. }
  57. public void messageArrived(String topic, MqttMessage message) throws Exception {
  58. DataConverterManager dm = new DataConverterManager(message.getPayload());
  59. System.out.print(LocalDateTime.now().getMinute()+",");
  60. List<PointInfo> pis = dm.getPintInfos();
  61. for (PointInfo pi :pis) {
  62. if (!MqttCache.subData.containsKey(pi.getPath())) {
  63. MqttCache.subData.put(pi.getPath(),pi);
  64. } else {
  65. MqttCache.subData.replace(pi.getPath(),pi);
  66. }
  67. }
  68. }
  69. }