2013-10-03 47 views
0

我無法找到重新連接mqtt回撥客戶端的邏輯。有方法onDisconnected(),但我無法找到文檔或任何互聯網上的示例。MQTT回撥客戶端重新連接邏輯

我的聽衆

公共類myListener的實現監聽 {

public MyListener() 
    { 

    } 

    @Override 
    public void onConnected() 
    { 
     System.out.println("Connected ...."); 
    } 

    @Override 
    public void onDisconnected() 
    { 
     System.out.println("Disconnected"); 
    } 

    @Override 
    public void onPublish(UTF8Buffer topic, Buffer body, Runnable ack) 
    { 
     System.out.println("Entered Onpublish"); 

     try 
     { 
     System.out.println("received msg:" + msg); 
     } 
     catch (HikeException e) 
     { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     finally{ 
      ack.run(); 
     } 

    } 


    @Override 
    public void onFailure(Throwable value) 
    { 
     value.printStackTrace(); 
    } 

} 

創建連接

private void createConnection(String host, int port,String id, String token) throws Exception 
{ 

    this.disconnect(); 
    MQTT mqtt = new MQTT(); 
    mqtt.setHost(host, port); 
    mqtt.setUserName(id); 
    mqtt.setPassword(token); 
    CallbackConnection callbackConnection = null; 
    callbackConnection = mqtt.callbackConnection(); 
    callbackConnection.listener(new MyListener()); 
    callbackConnection.connect(new MyCallback<Void>("CONNECT")); 
    callbackConnection.subscribe(new Topic[] { new Topic(uid + "/u", QoS.AT_MOST_ONCE) }, new MyCallback<byte[]>("EVENT SUBSCRIBE")); 
    callbackConnection.subscribe(new Topic[] { new Topic(uid + "/s", QoS.AT_LEAST_ONCE), new Topic(uid + "/a", QoS.AT_LEAST_ONCE) }, new MyCallback<byte[]>("MSG SUBSCRIBE")); 

    this.callbackConnection = callbackConnection; 
} 

myCallBack函數

class MyCallback<T> implements Callback<T> 
{ 
    public MyCallback(String tag) 
    { 
     super(); 
     this.tag = tag; 
    } 

    String tag; 

    @Override 
    public void onSuccess(T value) 
    { 
     System.out.println("TAG:" + tag + " =SUCCESS value=" + value); 
    } 

    @Override 
    public void onFailure(Throwable value) 
    { 
     System.out.println("TAG:" + tag + "Fail"); 
     value.printStackTrace(); 
    } 

} 

我的問題是如何實現mqtt重新連接到服務器邏輯?如果我應該使用onDisconnect()方法,那我該如何使用它?

+0

? – knolleary

回答

1

這裏是我如何實現Mqtt在連接丟失時重新連接,啓動一個線程嘗試連接到MqttServer,這將在成功連接時被破壞。

boolean retrying = false; 
    public void reConnect(){ 
     if (!retrying) { 
      retrying = true; 
      new Thread(new Runnable() { 
       @Override 
       public void run() { 
        for (;;) { 
         try { 
          if (isInetAvailable() && !mqttClient.isConnected()) { 
           if(isPasswdProtected) { 
            //connect with MqttConnectionOptions 
            Connect_with_passwd(); 
           } else { 
            Connect(); 
           } 
           Thread.sleep(MQTT_RETRY_INTERVAL); 
          } else if (isConnected()) { 
           List<String> topics = topicsSubscribed; 
           topicsSubscribed.clear(); 
           for (String topic : topics) { 
            try { 
             subscribeToTopic(topic); 
            } catch (MqttException e) { 
            } 
           } 
           retrying = false; 
           break; 
          } else if (!Internet.isAvailable()) { 
           Thread.sleep(INET_RETRY_INTERVEL); 
          } 
         } catch (MqttException | InterruptedException e) { 
          try { 
           Thread.sleep(MQTT_RETRY_INTERVAL); 
          } catch (InterruptedException ex) { 
          } 
         } 
        } 
       } 
      }).start(); 
     } 
} 
/*Check internet connection*/ 

public static boolean isInetAvailable() { 
    boolean connectivity; 
    try { 
     URL url = new URL(GOOGLE); 
     URLConnection conn = url.openConnection(); 
     conn.connect(); 
     connectivity = true; 
    } catch (IOException e) { 
     connectivity = false; 
    } 
    return connectivity; 
} 
0

我已經實現了這樣

 //check when network connectivity is back and implement the connection logic again 
     System.out.println("Connection Lost\n trying to re-connect"); 
     int tries=0; 
     while(true){ 
      Thread.sleep(MQTT_RETRY_INTERVAL); 
      if(checkIfNetworkAvailable()&& !MQTTClient.getInstance().mqttClient.isConnected()){ 
       try{ 
        tries++; 
       MQTTClient.getInstance().mqttClient.connect(MachineDetails.getInstance().getMACDetails(), true, (short) 1000); 
       //register handler 
       MQTTClient.getInstance().mqttClient.registerAdvancedHandler(ApplicationPublishHandler.getInstance()); 
       String[] topics={Constants.PUBLIC_BROADCAST_TOPIC}; 
       int[] qos={1}; 
       MQTTClient.getInstance().mqttClient.subscribe(topics, qos); 
       }catch(Exception e){ 
        //Service down and give an alert 
//     break; 
       } 
       if(tries>No of retries on network available) 
       break; 
      } 
     } 


    private boolean checkIfNetworkAvailable() { 
     try { 
      InetAddress.getByName("<<your host name>>"); 
      return true; 
     } catch (UnknownHostException e) { 
      return false; 
     } 


    } 
你使用的客戶端實現
相關問題