2015-10-28 115 views
0

我在我的項目中使用MQTT API來讀取數據。 (org.eclipse.paho org.eclipse.paho.client.mqttv3)(Paho)MQTT回調沒有關閉

當我關閉客戶端,mqttcallback仍然有效。 我通過在對象上調用unsubscribe(...)disconnect()close()來關閉mqtt。

我試着將回調設置爲null。沒什麼幫助。

mqtt客戶端關閉後。這是仍然不斷出現在日誌中的痕跡。

> com.xxx.binge.sources.mqtt.BingeMQTTReader$BingeMQTTCallback.connectionLost(BingeMQTTReader.java:479) 
>    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.connectionLost(CommsCallback.java:247) 
>    at org.eclipse.paho.client.mqttv3.internal.ClientComms.shutdownConnection(ClientComms.java:356) 
>    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146) 
>    at java.lang.Thread.run(Thread.java:745) 
>  Caused by: com.xxx.vds.api.VDSException: MQTT Connection exception 
>    at com.xxx.binge.sources.mqtt.BingeMQTTReader.reconnectWithRetry(BingeMQTTReader.java:334) 
>    at com.xxx.binge.sources.mqtt.BingeMQTTReader.access$200(BingeMQTTReader.java:46) 
>    at com.xxx.binge.sources.mqtt.BingeMQTTReader$BingeMQTTCallback.connectionLost(BingeMQTTReader.java:473) 
>    ... 4 more 
>  Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused 
>    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79) 
>    at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590) 
>    ... 1 more 
>  Caused by: java.net.ConnectException: Connection refused 
>    at java.net.PlainSocketImpl.socketConnect(Native Method) 
>    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
>    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
>    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
>    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
>    at java.net.Socket.connect(Socket.java:579) 
>    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70) 
>    ... 2 more 

回答

1

試試這個代碼

// Construct the MqttClient instance 
     MqttClient client = new MqttClient(brokerUrl, clientId); 
// Set this wrapper as the callback handler 
     client.setCallback(this); 
// Connect to the server 
     client.connect(); 
// Disconnect the client 
     client.disconnect(); 
+0

我MqttClient正確停止我。問題在於MqttCallback沒有關閉。此線程繼續運行。這是我如何設置回調 ' BingeMQTTCallback implements MqttCallback {...} callback = new BingeMQTTCallback(); client.setCallback(callback); ' 在問題中發佈的日誌在client.disconnect()被調用之後。 – nandini

+0

將當前包裝設置爲回調處理程序 –

+0

好的,將嘗試這個並嘗試更新。目前我在應用程序中將其作爲內部類。 – nandini

1

MQTT發佈Java代碼示例,

import java.text.SimpleDateFormat; 
import java.util.Date; 
import org.apache.log4j.Logger; 
import com.ibm.micro.client.mqttv3.MqttCallback; 
import com.ibm.micro.client.mqttv3.MqttClient; 
import com.ibm.micro.client.mqttv3.MqttDeliveryToken; 
import com.ibm.micro.client.mqttv3.MqttException; 
import com.ibm.micro.client.mqttv3.MqttMessage; 
import com.ibm.micro.client.mqttv3.MqttTopic; 

public class testMqtt implements MqttCallback { 

final static Logger logger = Logger.getLogger(Publish.class); 
public SimpleDateFormat loggerdateFormat = new SimpleDateFormat(
     "yyyy MMM dd HH:mm:ss"); 

public String send(String topicName, String msg) { 
    logger.info(loggerdateFormat.format(new Date()) 
      + " Control on method : send "); 
    String topic = topicName; 
    String message = msg; 
    logger.info(loggerdateFormat.format(new Date()) + " Message " + message); 
    if (message.trim().equals("-1")) { 
     logger.error(loggerdateFormat.format(new Date()) 
       + " : message wrong"); 
     return "fail"; 
    } 
    int qos = 2; 
    String url = "tcp://" + Config.Mqtt.BROCKER + ":" + Config.Mqtt.PORT; 
    String clientId = "Client-1"; 

    try { 

     Publish sampleClient = new Publish(url, clientId); 
     sampleClient.publish(topic, qos, message.getBytes()); 
    } catch (Exception me) { 
     me.printStackTrace(); 
    } 

    return "ok"; 
} 

// Private instance variables 
private MqttClient client; 
private String brokerUrl; 

public testMqtt(String brokerUrl, String clientId) throws MqttException { 
    this.brokerUrl = brokerUrl; 
    try { 
     // Construct the MqttClient instance 
     client = new MqttClient(this.brokerUrl, clientId); 
     // Set this wrapper as the callback handler 
     client.setCallback(this); 
    } catch (Exception e) { 
     e.printStackTrace(); 
     log("Unable to set up client: " + e.toString()); 
     System.exit(1); 
    } 
} 

public testMqtt() { 
    // TODO Auto-generated constructor stub 
} 

public void publish(String topicName, int qos, byte[] payload) 
     throws MqttException { 

    // Connect to the server 
    log("Connecting to " + brokerUrl); 
    try { 
     // Connect to the server 
     client.connect(); 
    } catch (Exception e) { 

     log("Unable to connect: " + e.toString()); 
     return; 
    } 
    log("Connected !!!"); 

    // Get an instance of the topic 
    MqttTopic topic = client.getTopic(topicName); 

    // Construct the message to publish 
    MqttMessage message = new MqttMessage(payload); 
    message.setQos(qos); 

    // Publish the message 
    log("Publishing to topic \"" + topicName + "\" qos " + qos); 
    MqttDeliveryToken token = topic.publish(message); 

    // Wait until the message has been delivered to the server 
    token.waitForCompletion(); 

    // Disconnect the client 
    client.disconnect(); 
    log("Disconnected"); 
} 

private void log(String message) { 
    logger.info(loggerdateFormat.format(new Date()) + ": log - " + message); 
} 

// Called when the connection to the server has been lost. 
public void connectionLost(Throwable cause) { 
    log("Connection to " + brokerUrl + " lost!"); 
    System.exit(1); 
} 

public void deliveryComplete(MqttDeliveryToken token) { 
    log("Message Delivered Successfully!!!"); 
} 

// Called when a message arrives from the server. 
public void messageArrived(MqttTopic topic, MqttMessage message) 
     throws MqttException { 
    logger.info(loggerdateFormat.format(new Date()) + " : Topic:\t\t" 
      + topic.getName()); 
    logger.info(loggerdateFormat.format(new Date()) + " : Message:\t" 
      + new String(message.getPayload())); 
    logger.info(loggerdateFormat.format(new Date()) + " : QoS:\t\t" 
      + message.getQos()); 
} 

} 
+0

paho mqtt not ibm .. group id org.eclipse.paho/artifact id:org.eclipse.paho org.eclipse.paho.client.mqttv3 我試着對此設置回調。同樣仍然:(。 – nandini