2015-07-01 69 views
6

我試圖在Eclipse中使用Java Paho在Java中實現MQTT客戶端上的某些功能。目標是訂閱一個主題,當收到一條消息時,客戶端會在另一個主題上發送另一條消息。如何使用Eclipse在Java MQTT客戶端上接收消息時發佈消息Paho

這看起來很容易,但我有一個奇怪的問題,我無法解決。這裏是我的代碼:

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.IMqttToken; 
import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 

public class MqttOperations implements MqttCallback { 

    MqttClient sampleClient; 
    MqttConnectOptions connOpts; 

    public MqttOperations() { 
    } 

    public static void main(String[] args) throws InterruptedException { 
     new MqttOperations().launchMqttClient(); 
    } 


    public void launchMqttClient() throws InterruptedException { 
     try { 
       MemoryPersistence persistence = new MemoryPersistence(); 
       sampleClient = new MqttClient("tcp://broker.mqttdashboard.com:1883", "iamaclient", persistence); 
       connOpts = new MqttConnectOptions(); 
       connOpts.setCleanSession(true); 
       sampleClient.connect(connOpts); 
       sampleClient.subscribe("topic/example/ofmessage"); 
       sampleClient.setCallback(this); 

      } catch(MqttException me) { 
       System.out.println("reason "+me.getReasonCode()); 
       System.out.println("msg "+me.getMessage()); 
       System.out.println("loc "+me.getLocalizedMessage()); 
       System.out.println("cause "+me.getCause()); 
       System.out.println("excep "+me); 
       me.printStackTrace(); 
      } 
    } 


    @Override 
    public void connectionLost(Throwable cause) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) throws MqttException 
    { 
     System.out.println("Received: " + message.toString()); 
     try{ 
      System.out.println("Publishing message: i am the answer"); 
      MqttMessage ans = new MqttMessage("i am the answer".getBytes()); 
      ans.setQos(2); 
      sampleClient.publish("topic/example/ofanswer", ans); 
      System.out.println("Message published"); 

     }catch(MqttException me){ 
       System.out.println("reason "+me.getReasonCode()); 
       System.out.println("msg "+me.getMessage()); 
       System.out.println("loc "+me.getLocalizedMessage()); 
       System.out.println("cause "+me.getCause()); 
       System.out.println("excep "+me); 
       me.printStackTrace(); 
     } 

    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 

    } 

} 

事情是,這個程序只工作一次。收到郵件時,會發送對此郵件的答覆,但看起來郵件「發佈的郵件」永遠不會顯示在屏幕上,並且客戶端也不會收到任何其他郵件。 我有這樣的印象,sampleClient.publish("topic/example/ofanswer", ans);行從來沒有完成它的執行。 有誰知道它是怎麼來的,請問如何解決我的問題?

+0

另一個精度:我發現一些資料來源說明我應該小心,不要回應我自己的回答,否則它不能很好地工作。但我認爲我不關心這個問題,因爲我用來訂閱和發佈回覆的主題不同 – tben

+0

我認爲這是一個問題,你在消息獲得回調中阻塞。您可以嘗試在另一個線程中發佈(例如,使用Executor並僅在messageArrived回調中提交發布命令)? –

回答

2

今天我有類似的問題。當我讀an other question with two connections我明白了:你需要兩個MqttClient實例。一個用於發佈,另一個用於訂閱。不幸的是,我沒有發現這個事實的文件。

順便說一句。在我第一次實現兩個客戶端時,我給了他們相同的ID(邏輯上它應該是相同的連接)。但第二個連接斷開了第一個連接。當我開始使用兩個不同的ID時,它開始工作。

+0

它真的幫了我。 :) –

+0

如果您爲messageArrived()閱讀文檔http://www.eclipse.org/paho/files/javadoc/index.html,則會困擾我,因爲您不需要有兩個連接,但是,它作品如此歡呼你! – Clocker

0

Dominik Obermaier是對的:問題是你在messageArrived中阻塞。具體來說,MqttClient.publish會一直等待,直到接收到消息的傳遞通知爲止 - 但MqttClient工作線程永遠不會檢索它,因爲它正在等待messageArrived中的通知!

雙客戶端解決方案的工作原理是,其他客戶端的工作線程可以自由地從套接字中檢索通知,但正確的解決方案是從messageArrived中發佈QoS 0(因爲QoS 0消息不需要確認傳遞)或使用不等待消息傳遞的API,例如MqttTopic.publish。

相關問題