2015-07-13 58 views
1

如果我使用QOS類型1表示代理將繼續向訂戶發送消息,直到獲得確認。我如何設置或返回ack?請任何人都對此有所瞭解。MQTT確認

這是我的源代碼:

import java.io.BufferedWriter; 
import java.io.File; 
import java.io.FileWriter; 
import java.io.IOException; 
import java.io.PrintWriter; 
import java.net.URISyntaxException; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.Properties; 
import java.util.Vector; 

import org.fusesource.hawtbuf.Buffer; 
import org.fusesource.hawtbuf.UTF8Buffer; 
import org.fusesource.mqtt.client.Callback; 
import org.fusesource.mqtt.client.CallbackConnection; 
import org.fusesource.mqtt.client.Listener; 
import org.fusesource.mqtt.client.MQTT; 
import org.fusesource.mqtt.client.QoS; 
import org.fusesource.mqtt.client.Topic; 

import com.adventnet.management.log.Log; 
import com.adventnet.nms.util.NmsLogMgr; 
public class DefaultMqttListener implements IMqttListener,Runnable{ 

    long count = 0; 
    long start = System.currentTimeMillis(); 
    private HashMap serverDetailsHash; 
    public DefaultMqttListener(HashMap serverProp) 
    { 
     this.serverDetailsHash = serverProp; 
    } 
    CallbackConnection myconnection; 
    @Override 
    public void init() { 
     MQTT mqtt = new MQTT(); 
     String user = env("APOLLO_USER", (String)serverDetailsHash.get("userName")); //No I18N 
     String password = env("APOLLO_PASSWORD", (String)serverDetailsHash.get("password")); //No I18N 
     String host = env("APOLLO_HOST", (String)serverDetailsHash.get("hostName")); //No I18N 
     int port = Integer.parseInt(env("APOLLO_PORT", (String)serverDetailsHash.get("port"))); 
     try { 
      mqtt.setHost(host, port); 
      mqtt.setUserName(user); 
      mqtt.setPassword(password); 
      final CallbackConnection connection = mqtt.callbackConnection(); 
      myconnection = connection; 
      connection.listener(new org.fusesource.mqtt.client.Listener() { 
       public void onConnected() { 
       } 
       public void onDisconnected() { 
       } 
       public void onFailure(Throwable value) { 
        value.printStackTrace(); 
        System.exit(-2); 
       } 
       public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) { 
         long time = System.currentTimeMillis(); 
         callback(topic, msg, ack,connection,time); 
       } 
      }); 
      connection.connect(new Callback<Void>() { 
       @Override 
       public void onSuccess(Void value) { 
        NmsLogMgr.M2MERR.log("MQTT Listener connected in ::::", Log.SUMMARY); 
        ArrayList getTopics = (ArrayList)serverDetailsHash.get("Topics"); 
        for(int i=0;i<getTopics.size();i++) 
        { 
         HashMap getTopic = (HashMap)getTopics.get(i); 
         String topicName = (String) getTopic.get("topicName"); 
         String qosType = (String) getTopic.get("qosType"); 
         Topic[] topic = {new Topic(topicName, getQosType(qosType))}; 
         connection.subscribe(topic, new Callback<byte[]>() { 
          public void onSuccess(byte[] qoses) { 
          } 
          public void onFailure(Throwable value) { 
           value.printStackTrace(); 
           System.exit(-2); 
          } 
         }); 
        } 
        //Topic[] topics = {new Topic("adminTest", QoS.AT_LEAST_ONCE),new Topic("adminTest1", QoS.AT_LEAST_ONCE)}; 
       } 
       @Override 
       public void onFailure(Throwable value) { 
        value.printStackTrace(); 
        System.exit(-2); 
       } 
      }); 

      // Wait forever.. 
      synchronized (Listener.class) { 
       while(true){ 
        Listener.class.wait();} 

      } 
     } catch (URISyntaxException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } 
     catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    private static String env(String key, String defaultValue) { 
     String rc = System.getenv(key); 
     if(rc== null){ 
      return defaultValue;} 
     return rc; 
    } 

    @Override 
    public void callback(UTF8Buffer topic, Buffer msg, Runnable ack, CallbackConnection connection, long time) { 
     // TODO Auto-generated method stub 
     try { 
      String Message = msg.utf8().toString(); 
      MQTTMessage mqttMsg = new MQTTMessage(); 
      mqttMsg.setMQTTMessage(Message); 
      mqttMsg.setTime(time); 
      mqttMsg.setTopic(topic); 
      mqttMsg.sethostName((String) serverDetailsHash.get("hostName")); 
      MQTTCacheManager.mgr.addToCache(mqttMsg); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    } 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 
      NmsLogMgr.M2MERR.log("myconnection closed", Log.SUMMARY); 
      myconnection.disconnect(new Callback<Void>() { 
      @Override 
      public void onSuccess(Void value) { 
       System.exit(0); 
      } 
      @Override 
      public void onFailure(Throwable value) { 
       value.printStackTrace(); 
       System.exit(-2); 
      } 
     }); 

    } 

    @Override 
    public void run() { 
     this.init(); 
     // TODO Auto-generated method stub 
    } 
    public QoS getQosType(String name) 
    { 
     Properties qosContainer = new Properties(); 
     qosContainer.put("0", QoS.AT_MOST_ONCE); 
     qosContainer.put("1", QoS.AT_LEAST_ONCE); 
     qosContainer.put("2", QoS.EXACTLY_ONCE); 
     QoS qosName = (QoS) qosContainer.get(name); 
     return qosName; 
    } 
} 

回答

1

你不要在你的代碼發送確認的話,那應該全部由您所使用的MQTT庫來處理。

QOS ack的數據包位於發佈者和代理之間,然後分別在代理和任何訂閱者之間。

+0

但是,如果我使用QOS1(ATLEAST_ONCE)或QOS2(EXACTLY_ONCE),我會繼續每隔10秒收到相同的消息。這會一直重複,直到斷開Broker-Client連接。即使在重新連接之後,每10秒鐘都會從代理收到消息。 –

+0

好的,那麼它聽起來像是經紀人或者你使用的圖書館已經損壞或者不支持除QOS 0以外的其他任何內容。 – hardillb

+0

我的一位同事提出了這種方法。 public void onPublish(UTF8Buffer topic,Buffer msg,Runnable ack){long} = System.currentTimeMillis();回調(主題,味精,確認,連接,時間); ack.run();這是對的 ??? –

0

我沒有使用Java庫,但需要訂閱指定QoS級別1(至少具有一個傳送)或QoS級別2(只有一次傳送)的主題。在這些情況下,底層庫將ACK數據包發送給代理。

Paolo。