2017-08-13 176 views
1

我一直在嘗試將MQTT合併到我的Android應用程序中。我原本是在一個活動中工作,並從此嘗試將其移動到服務中以在後臺運行。我能夠連接併發送來自服務的消息,但我無法接收消息。我的服務實現了MqttCallback並覆蓋了messageReceived()函數,但它似乎永遠不會被調用。有人可以幫我理解爲什麼回調不會觸發嗎?Android PAHO MQTT messageArrived回調不觸發

package com.example.test; 

import java.text.SimpleDateFormat; 
import java.util.Calendar; 

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.MqttClientPersistence; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.MqttToken; 
import org.eclipse.paho.client.mqttv3.MqttTopic; 
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; 

import android.app.Service; 
import android.content.Intent; 
import android.os.Bundle; 
import android.os.IBinder; 
import android.util.Log; 

public class MqttService extends Service implements MqttCallback{ 

    final static String MY_ACTION = "MqttService"; 
    String username; 

    //MQTT Variables 
    MqttClient mySubClient; 
    MqttConnectOptions connOpt; 

    String BROKER_URL; 
    String PUB_TOPIC; 
    String SUB_TOPIC; 
    String PUB_CLIENT_ID; 
    String SUB_CLIENT_ID; 
    Boolean MqttConnState; 
    Boolean IsRunning; 



    static final String TOPIC_BASE = MyProperties.TOPIC_BASE; 
    static final String PI_BROKER_URL = MyProperties.PI_BROKER_URL; //no encryption 
    static final String PI_SSL_BROKER_URL = MyProperties.PI_SSL_BROKER_URL; //ssl enabled 




    @Override 
    public IBinder onBind(Intent arg0) { 
     // TODO Auto-generated method stub 
     return null; 
    } 

    @Override 
    public int onStartCommand(Intent intent, int flags, int startId) { 


     Log.i("log_mqtt_service", "Service onStartCommand entered"); 

     //Get session id from previous activity 
     Bundle extras = intent.getExtras(); 
     if (extras != null) { 
      username = extras.getString("SESSION_ID"); 
     } 

     //set mqtt vars 
     BROKER_URL = PI_BROKER_URL; 
     PUB_CLIENT_ID = "My-Pub-" + username; 
     SUB_CLIENT_ID = "My-Sub-" + username; 
     SUB_TOPIC = TOPIC_BASE + username; 
     PUB_TOPIC = TOPIC_BASE + "test"; 
     IsRunning = false; 


     MqttThread myThread = new MqttThread(); 
     myThread.start(); 

     return super.onStartCommand(intent, flags, startId); 

    } 

    public class MqttThread extends Thread{ 

     @Override 
     public void run() { 
      if(IsRunning==false) 
      { 
       IsRunning = true; 
       connMqtt(); 

      } 
      while(IsRunning){ 
       //keep thread running 
       Log.i("log_mqtt_thread_send", "thread sending mqtt message"); 
       sendMsg(PUB_TOPIC, "0x0001", "Message from " + PUB_CLIENT_ID); 
      } 
      Log.i("log_mqtt_thread", "thread stopped"); 
     } 
    } 

    @Override 
    public void onDestroy() 
    { 
     super.onDestroy(); 
     try { 
      mySubClient.disconnect(); 
     } catch (MqttException e) { 
      Log.e ("log_mqtt_disconnect",e.toString()); 
     } 
    } 

    @Override 
    public void connectionLost(Throwable cause) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) 
      throws Exception { 
     String msgStr; 
     msgStr = new String(message.getPayload()); 

     Log.i("log_mqtt_Rx", "-------------------------------------------------"); 
     Log.i("log_mqtt_Rx", "| Topic: " + topic.toString()); 
     Log.i("log_mqtt_Rx", "| Message: " + msgStr); 
     Log.i("log_mqtt_Rx", "-------------------------------------------------"); 

     Intent intent = new Intent(); 
     intent.setAction(MY_ACTION); 
     intent.putExtra("RX_MESSAGE", msgStr); 
     sendBroadcast(intent); 

    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 
     // TODO Auto-generated method stub 

    } 

    // ------------------------------------------------------------------------ 
    // Function to make mqtt connection and subscribe 
    // ------------------------------------------------------------------------ 
    int connMqtt() 
    { 
     int result=0; 
     MqttClientPersistence persistence = new MqttDefaultFilePersistence(getBaseContext().getApplicationInfo().dataDir); 

     try{ 
      Log.i("log_mqtt","mqtt start"); 
      try{ 
       // setup MQTT Client 
       connOpt = new MqttConnectOptions(); 
       connOpt.setCleanSession(true); 
       connOpt.setKeepAliveInterval(30); 
       Log.i("log_mqtt","connOpt vars setup"); 
      }catch(Exception e){ 
       result = -1; 
       Log.e ("log_mqtt",e.toString()); 
      } 

      // Connect to Broker for Subscriber connection 
      try { 
       mySubClient = new MqttClient(BROKER_URL, SUB_CLIENT_ID, persistence); 
       Log.i("log_mqtt_conn","create mqttClient"); 
       mySubClient.connect(connOpt); 
       Log.i("log_mqtt_conn","MQTT client connected to " + BROKER_URL); 
      } catch (MqttException e) { 
       result = -2; 
       Log.i ("log_mqtt_conn","BROKER: " + BROKER_URL); 
       Log.i ("log_mqtt_conn","SUB_CLIENT_ID: " + SUB_CLIENT_ID); 
       Log.e ("log_mqtt_conn",e.toString()); 
      } 


      try { 
       int subQoS = 0; 
       mySubClient.subscribe(SUB_TOPIC, subQoS); 
       Log.i("log_mqtt_sub","mqtt client subscribed to \"" + SUB_TOPIC + "\""); 
      } catch (Exception e) { 
       result = -3; 
       Log.e ("log_mqtt_sub",e.toString()); 
      } 
     }catch(Exception e){ 
      result = -4; 
      Log.e ("log_mqtt",e.toString()); 
     } 

      return result; 
    } 

    // ------------------------------------------------------------------------ 
    // Function to send mqtt message to a topic 
    // ------------------------------------------------------------------------ 
    public void sendMsg(String sendTopic, String msgid, String msg) 
    { 
     String timeStamp = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime()); 
     String pubMsg = "{\"msgid\":\"" + msgid + "\",\"time\":\"" + timeStamp + "\", \"message\":\"" + msg + "\"}"; 
     int pubQoS = 0; 
     MqttMessage message = new MqttMessage(pubMsg.getBytes()); 
     message.setQos(pubQoS); 
     message.setRetained(false); 

     //Topic for publisher 
     MqttTopic pubTopic = mySubClient.getTopic(sendTopic); 

     // Publish the message 
     Log.i("log_mqtt_send","Publishing to topic \"" + pubTopic + "\" qos " + pubQoS); 
     Log.i("log_mqtt_msg", pubMsg); 
     MqttDeliveryToken token = null; 
     try { 
      // Publish message to broker then disconnect 
      token = pubTopic.publish(message); 
      // Wait until the message has been delivered to the broker 
      token.waitForCompletion(); 
      Thread.sleep(1000); 
     } catch (Exception ex) { 
      Log.e("log_mqtt_error",ex.toString()); 
     } 
    } 
} 

回答

0

你必須在你的'mySubClient.setCallBack'上添加回調,那麼只有你的回調方法會被調用。