2014-07-17 58 views
1

我有兩個主題和一個經紀人的網址。我需要使用一個代理網址發佈到這兩個主題。一個經紀人的網址有很多話題在mqtt

我用一個代理網址和一個主題完成了它。然後,我嘗試着處理兩個主題,併爲每個主題編寫兩個訂戶類,但是當我運行兩個訂戶類時,會顯示連接丟失。 建議一些很好的例子來做到這一點。

MQTTPublisher.java

import java.io.BufferedReader; 
import java.io.InputStreamReader; 

import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.MqttClientPersistence; 
import org.eclipse.paho.client.mqttv3.MqttDefaultFilePersistence; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.MqttTopic; 

public class MQTTPublisher { 


static final String BROKER_URL = "tcp://localhost:1883";// public mosquitto 
                 // server 
static final String TOPIC = "iotm/ej";// Change according to your 
              // application 
static final String TOPIC1 = "iotm/stream1"; 

public static void main(String args[]) { 

    try { 
     // Creating new default persistence for mqtt client 
     MqttClientPersistence persistence = new MqttDefaultFilePersistence(
       "/tmp"); 

     // mqtt client with specific url and client id 
     MqttClient client1 = new MqttClient(BROKER_URL, "Publisher-ID", 
       persistence); 

     client.connect(); 

     MqttTopic myTopic = client1.getTopic(TOPIC); 
     MqttTopic myTopic1 = client1.getTopic(TOPIC1); 
     String msg = "AMMA!DEVI!dURGA"; 
     System.out.println("Enter the message to publish,Type quit to exit\n"); 
     BufferedReader br = new BufferedReader(new 
     InputStreamReader(System.in)); 
     msg = br.readLine(); 
     while (!msg.equals("quit")) { 
     myTopic.publish(new MqttMessage(msg.getBytes())); 
     System.out.println("Message published on" + TOPIC); 
     myTopic1.publish(new MqttMessage(msg.getBytes())); 

     System.out.println("Message published on" + TOPIC1); 
     msg = br.readLine(); 
     } 
     myTopic.publish(new MqttMessage(msg.getBytes())); 

     myTopic1.publish(new MqttMessage(msg.getBytes())); 

     // client.disconnect(); 
    } catch (MqttException e) { 
     e.printStackTrace(); 
    } catch (Exception e) { 
     System.err.println(e.getMessage()); 
    } 
} 
} 

MQTTSubscriber.java

import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.MqttDefaultFilePersistence; 
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.MqttTopic; 

public class MQTTSubscriber { 

static final String BROKER_URL = "tcp://localhost:1883";// public 
// mosquitto server 
static final String TOPIC = "iotm/ej"; // Change according to your 



public static void main(String args[]) { 

    try { 
     // Creating new default persistence for mqtt client 
     MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence(
       "/tmp"); 

     // mqtt client with specific url and a random client id 
     MqttClient client1 = new MqttClient(BROKER_URL, "Subscriber-ID", 
       persistence); 
     client1.connect(); 
     System.out.println("Subscribing to topic '" + TOPIC + "' from " 
       + client1.getServerURI()); 
     // Subscribing to specific topic 
     client1.subscribe(TOPIC); 

     // It will trigger when a new message is arrived 
     MqttCallback callback = new MqttCallback() { 
      @Override 
      public void messageArrived(MqttTopic arg0, MqttMessage arg1) 
        throws Exception { 
       System.out.println("Message:" 
         + new String(arg1.getPayload())); 

      } 

      @Override 
      public void deliveryComplete(MqttDeliveryToken arg0) { 

      } 

      @Override 
      public void connectionLost(Throwable arg0) { 
       System.out.println("Connection lost"); 
      } 
     }; 
     // Continue waiting for messages until the Enter is pressed 
     client1.setCallback(callback); 
     /* 
     * System.out.println("Press <Enter> to exit"); try { 
     * System.in.read(); } catch (IOException e) { // If we can't read 
     * we'll just exit } 
     */ 
     // client.disconnect(); 
     // System.out.println("Client Disconnected"); 

    } catch (MqttException e) { 
     e.printStackTrace(); 
    } catch (Exception e) { 
     System.err.println(e.getMessage()); 
    } 
} 

} 

MQTTSubscriber2.java

import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.MqttDefaultFilePersistence; 
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.MqttTopic; 

    public class MQTTSubscriber2 { 


static final String BROKER_URL = "tcp://localhost:1883";// public 
// mosquitto server 
static final String TOPIC = "iotm/stream1"; 



public static void main(String args[]) { 

    try { 
     // Creating new default persistence for mqtt client 
     MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence(
       "/tmp"); 

     // mqtt client with specific url and a random client id 
     MqttClient client = new MqttClient(BROKER_URL, "Subscriber-ID", 
       persistence); 
     client.connect(); 
     System.out.println("Subscribing to topic '" + TOPIC + "' from " 
       + client.getServerURI()); 
     // Subscribing to specific topic 
     client.subscribe(TOPIC); 

     // It will trigger when a new message is arrived 
     MqttCallback callback = new MqttCallback() { 
      @Override 
      public void messageArrived(MqttTopic arg0, MqttMessage arg1) 
        throws Exception { 
       System.out.println("Message:" 
         + new String(arg1.getPayload())); 

      } 

      @Override 
      public void deliveryComplete(MqttDeliveryToken arg0) { 

      } 

      @Override 
      public void connectionLost(Throwable arg0) { 
       System.out.println("Connection lost"); 
      } 
     }; 
     // Continue waiting for messages until the Enter is pressed 
     client.setCallback(callback); 
     /* 
     * System.out.println("Press <Enter> to exit"); try { 
     * System.in.read(); } catch (IOException e) { // If we can't read 
     * we'll just exit } 
     */ 
     // client.disconnect(); 
     // System.out.println("Client Disconnected"); 

    } catch (MqttException e) { 
     e.printStackTrace(); 
    } catch (Exception e) { 
     System.err.println(e.getMessage()); 
    } 
} 

}

+0

你可以發佈一些示例代碼,顯示問題?也不清楚,但你使用2個客戶端實例嗎?如果是這樣,他們將需要不同的客戶ID – hardillb

+0

@hardillb:添加我的示例代碼..其實我是新來的mqtt所以我只是做一個基於網絡的例子 – PathuZ

回答

3

如果您運行的是潛艇的2種獨立的情況下, criber code,那麼他們都需要不同的客戶端ID。如果您使用相同的方式運行2,那麼當第二次連接時,第一個將與代理斷開連接。