|
@@ -0,0 +1,198 @@
|
|
|
|
+package com.gyee.dataadapter.config;
|
|
|
|
+
|
|
|
|
+import org.eclipse.paho.client.mqttv3.*;
|
|
|
|
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ * 发布端:主要实现发布消息和订阅主题,接收消息在回调类PushCallback中
|
|
|
|
+ * 要发布消息的时候只需要调用sendMQTTMessage方法就OK了
|
|
|
|
+ */
|
|
|
|
+@Service
|
|
|
|
+public class MqttServer {
|
|
|
|
+ private static final Logger LOGGER = LoggerFactory.getLogger(MqttServer.class);
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ private MqttClient subsribeClient;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ * 发布者客户端对象
|
|
|
|
+ * 这里订阅者和发布者的MqttClient对象分别命名是为了让发布者和订阅者分开,
|
|
|
|
+ * 如果订阅者和发布者都用一个MqttClient链接对象,则会出现两方都订阅了某个主题后,
|
|
|
|
+ * 谁发送了消息,都会自己接收到自己发的消息,所以分开写,里面主要就是回调类的设置setCallback
|
|
|
|
+ */
|
|
|
|
+ private MqttClient publishClient;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ public MqttTopic topic;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ public MqttMessage message;
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ private MqttConnect mqttConnect;
|
|
|
|
+
|
|
|
|
+ @Resource
|
|
|
|
+ private MqttConfig config;
|
|
|
|
+
|
|
|
|
+ public MqttServer() {
|
|
|
|
+ LOGGER.info("9011上线了");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ * 发布者客户端和服务端建立连接
|
|
|
|
+ */
|
|
|
|
+ public MqttClient publishConnect() {
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ if (publishClient == null) {
|
|
|
|
+
|
|
|
|
+ publishClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ MqttConnectOptions options = mqttConnect.getOptions();
|
|
|
|
+
|
|
|
|
+ if (!publishClient.isConnected()) {
|
|
|
|
+ publishClient.connect(options);
|
|
|
|
+ LOGGER.info("---------------------连接成功");
|
|
|
|
+ } else {
|
|
|
|
+ publishClient.disconnect();
|
|
|
|
+ publishClient.connect(mqttConnect.getOptions(options));
|
|
|
|
+ LOGGER.info("---------------------连接成功");
|
|
|
|
+ }
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
+ LOGGER.info(e.toString());
|
|
|
|
+ }
|
|
|
|
+ return publishClient;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ * 订阅端的链接方法,关键是回调类的设置,要对订阅的主题消息进行处理
|
|
|
|
+ * 断线重连方法,如果是持久订阅,重连时不需要再次订阅
|
|
|
|
+ * 如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
|
|
|
|
+ * true为非持久订阅
|
|
|
|
+ */
|
|
|
|
+ public void subsribeConnect() {
|
|
|
|
+ try {
|
|
|
|
+
|
|
|
|
+ if (subsribeClient == null) {
|
|
|
|
+
|
|
|
|
+ System.out.println("host: " + config.getHost() + " " + "客户端id: " + config.getClientid());
|
|
|
|
+ subsribeClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());
|
|
|
|
+
|
|
|
|
+ subsribeClient.setCallback(new PushCallback(MqttServer.this));
|
|
|
|
+ }
|
|
|
|
+ MqttConnectOptions options = mqttConnect.getOptions();
|
|
|
|
+
|
|
|
|
+ if (!subsribeClient.isConnected()) {
|
|
|
|
+ subsribeClient.connect(options);
|
|
|
|
+ } else {
|
|
|
|
+ subsribeClient.disconnect();
|
|
|
|
+ subsribeClient.connect(mqttConnect.getOptions(options));
|
|
|
|
+ }
|
|
|
|
+ LOGGER.info("----------Mqtt客户端连接成功");
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
+ LOGGER.info(e.getMessage(), e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ * 把组装好的消息发出去
|
|
|
|
+ */
|
|
|
|
+ public boolean publish(MqttTopic topic, MqttMessage message) {
|
|
|
|
+
|
|
|
|
+ MqttDeliveryToken token = null;
|
|
|
|
+ try {
|
|
|
|
+
|
|
|
|
+ token = topic.publish(message);
|
|
|
|
+ token.waitForCompletion();
|
|
|
|
+
|
|
|
|
+ boolean flag = token.isComplete();
|
|
|
|
+
|
|
|
|
+ StringBuffer sbf = new StringBuffer(200);
|
|
|
|
+ sbf.append("给主题为'" + topic.getName());
|
|
|
|
+ sbf.append("'发布消息:");
|
|
|
|
+ if (flag) {
|
|
|
|
+ sbf.append("成功!消息内容是:" + new String(message.getPayload()));
|
|
|
|
+ } else {
|
|
|
|
+ sbf.append("失败!");
|
|
|
|
+ }
|
|
|
|
+ LOGGER.info(sbf.toString());
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
+ LOGGER.info(e.toString());
|
|
|
|
+ }
|
|
|
|
+ return token.isComplete();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ * MQTT发送指令:主要是组装消息体
|
|
|
|
+ *
|
|
|
|
+ * @param topic 主题
|
|
|
|
+ * @param data 消息内容
|
|
|
|
+ * @param qos 消息级别
|
|
|
|
+ */
|
|
|
|
+ public void sendMQTTMessage(String topic, String data, int qos) {
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ this.publishClient = publishConnect();
|
|
|
|
+ this.topic = this.publishClient.getTopic(topic);
|
|
|
|
+ message = new MqttMessage();
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ message.setQos(qos);
|
|
|
|
+
|
|
|
|
+ message.setRetained(false);
|
|
|
|
+
|
|
|
|
+ message.setPayload(data.getBytes());
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ publish(this.topic, message);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOGGER.info(e.toString());
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ * 订阅端订阅消息
|
|
|
|
+ *
|
|
|
|
+ * @param topic 要订阅的主题
|
|
|
|
+ * @param qos 订阅消息的级别
|
|
|
|
+ */
|
|
|
|
+ public void init(String topic, int qos) {
|
|
|
|
+
|
|
|
|
+ subsribeConnect();
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ subsribeClient.subscribe(topic, qos);
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
+ LOGGER.info(e.getMessage(), e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ * 订阅端取消订阅消息
|
|
|
|
+ *
|
|
|
|
+ * @param topic 要订阅的主题
|
|
|
|
+ */
|
|
|
|
+ public void unionInit(String topic) {
|
|
|
|
+
|
|
|
|
+ subsribeConnect();
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+
|
|
|
|
+ subsribeClient.unsubscribe(topic);
|
|
|
|
+ } catch (MqttException e) {
|
|
|
|
+ LOGGER.info(e.getMessage(), e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|