2014-07-16 134 views
12

我是Android和服務的新手。我的目標是能夠設置訂閱並製作關於主題字符串的出版物。解析文本字段的輸入後,主題字符串和客戶端ID被設置。我正在使用Paho MQTT service(下載源代碼並構建了JAR)。android - Paho MQTT服務發佈

以下原因會在c.publish()處產生空指針異常。 logcat顯示在方法MqttAndroidClient中的例外,其中正在執行遞送令牌。

public class MainActivity extends Activity { 
    @Override 
    protected void onCreate(Bundle savedInstanceState) { 
     super.onCreate(savedInstanceState); 
     setContentView(R.layout.activity_main); 

     // Set locale; 
     l = getResources().getConfiguration().locale; 
    } 

    @Override 
    protected void onResume() { 
     super.onResume(); 
     addButtonListener();   
    } 

    private void addButtonListener() { 
     Button submitButton = (Button) findViewById(R.id.buttonSubmit); 

     submitButton.setOnClickListener(new OnClickListener() { 
// ... 
// validation code for fields in layout 
// ... 
// Finally, this. 

        MemoryPersistence mPer = new MemoryPersistence(); 
        String clientId = UUID.randomUUID().toString(); 
        String brokerUrl = "tcp://m2m.eclipse.org:1883"; 
        MqttAndroidClient c = new MqttAndroidClient(getApplicationContext(), brokerUrl, clientId, mPer); 
        try { 
         c.connect(); 
         String topic = "transfers/topic"; 
         String msg = "topic payload" 
         MqttMessage m = new MqttMessage(); 
         m.setPayload(msg.getBytes()); 
         m.setQos(2); 
         m.setRetained(false); 
         c.publish(topic, m); 
        } catch (MqttException e) { 
         Toast.makeText(getApplicationContext(), e.getMessage(), Toast.LENGTH_SHORT).show(); 
        } 

您能否告訴我如何使用該服務發佈和訂閱?我確實瀏覽了示例項目(來自Paho Android)。隨着LWT的佈局(activity_publish.xml)似乎也用於發佈,LWT和發佈似乎被合併。

回答

11

NullPointerException是因爲connect()調用異步方法,您需要實現ActionListener。 如果成功,您可以發送消息。

Log.i(LOGTAG, "MQTT Start"); 

MemoryPersistence memPer = new MemoryPersistence(); 

final MqttAndroidClient client = new MqttAndroidClient(
    context, "tcp://192.168.0.13:1883", username, memPer); 

try { 
    client.connect(null, new IMqttActionListener() { 

     @Override 
     public void onSuccess(IMqttToken mqttToken) { 
      Log.i(LOGTAG, "Client connected"); 
      Log.i(LOGTAG, "Topics="+mqttToken.getTopics()); 

      MqttMessage message = new MqttMessage("Hello, I am Android Mqtt Client.".getBytes()); 
      message.setQos(2); 
      message.setRetained(false); 

      try { 
       client.publish("messages", message); 
       Log.i(LOGTAG, "Message published"); 

       client.disconnect(); 
       Log.i(LOGTAG, "client disconnected"); 

      } catch (MqttPersistenceException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 

      } catch (MqttException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     @Override 
     public void onFailure(IMqttToken arg0, Throwable arg1) { 
      // TODO Auto-generated method stub 
      Log.i(LOGTAG, "Client connection failed: "+arg1.getMessage()); 

     } 
    }); 
} 
+0

Th at真的很有趣,但是怎麼能說這個方法會做異步工作呢? 在C#中不可能有這樣的實現,因爲connect()方法需要被標記爲異步以避免這種錯誤。 –

2

明白,你需要調用client.setCallback,爲了收到關於主題的消息落實MqttCallbackHandler到您訂閱這是非常重要的。

下面是代碼的例子,可以發佈和訂閱等

下面的代碼最初出版MQTT主題和有效載荷:

  • 主題: AndroidPhone
  • 有效載荷:你好,我是一個Android Mqtt客戶端。

該代碼訂閱主題「測試儀」。如果它接收與主題「測試器」和「激活的報警」,那麼的有效載荷的消息時,它公佈了以下主題和有效載荷(通過上面提到的回調):

  • 主題: Fitlet
  • 有效載荷:你好,Mosquitto經紀人收到你的消息說 報警被激活。

如果您正在使用Mosquitto然後在終端中輸入以下命令將導致該消息被髮射出去:

mosquitto_pub -h 192.168.9.100 -t tester -m "Alarm Activated" -u fred -P 1234 

凡我Mosquitto用戶名是弗雷德和我的密碼是1234

代碼:

package colin.android.mqtt;  
import android.support.v7.app.AppCompatActivity; 
import android.os.Bundle; 
import android.util.Log; 
import android.widget.Toast; 
import org.eclipse.paho.android.service.MqttAndroidClient; 
import org.eclipse.paho.client.mqttv3.IMqttActionListener; 
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.IMqttToken; 
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; 
import org.eclipse.paho.client.mqttv3.MqttClient; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 
import org.eclipse.paho.client.mqttv3.MqttPersistenceException; 
import java.io.UnsupportedEncodingException; 

public class MainActivity extends AppCompatActivity { 
    @Override 
    protected void onCreate(Bundle savedInstanceState) { 
     super.onCreate(savedInstanceState); 
     setContentView(R.layout.activity_main); 

     String clientId = MqttClient.generateClientId(); 

     //The URL of the Mosquitto Broker is 192.168.9.100:1883 
     final MqttAndroidClient client = new MqttAndroidClient(this.getApplicationContext(), "tcp://192.168.9.100:1883", clientId); 

     client.setCallback(new MqttCallbackHandler(client));//This is here for when a message is received 

     MqttConnectOptions options = new MqttConnectOptions(); 

     try { 
      options.setUserName("fred"); 
      options.setPassword("1234".toCharArray()); 
      IMqttToken token = client.connect(options); 

      token.setActionCallback(new IMqttActionListener() { 
       @Override 
       public void onSuccess(IMqttToken asyncActionToken) { 
        // We are connected 
        Log.d("mqtt", "onSuccess"); 
//----------------------------------------------------------------------------------------------- 
        //PUBLISH THE MESSAGE      
        MqttMessage message = new MqttMessage("Hello, I am an Android Mqtt Client.".getBytes()); 
        message.setQos(2); 
        message.setRetained(false); 

        String topic = "AndroidPhone"; 

        try { 
         client.publish(topic, message); 
         Log.i("mqtt", "Message published"); 

         // client.disconnect(); 
         //Log.i("mqtt", "client disconnected"); 

        } catch (MqttPersistenceException e) { 
         // TODO Auto-generated catch block 
         e.printStackTrace(); 

        } catch (MqttException e) { 
         // TODO Auto-generated catch block 
         e.printStackTrace(); 
        } 
//----------------------------------------------------------------------------------------------- 

        String subtopic = "tester"; 
        int qos = 1; 
        try { 
         IMqttToken subToken = client.subscribe(subtopic, qos); 
         subToken.setActionCallback(new IMqttActionListener() { 
          @Override 
          public void onSuccess(IMqttToken asyncActionToken) { 
           // The message was published 
           Log.i("mqtt", "subscription success"); 
          } 

          @Override 
          public void onFailure(IMqttToken asyncActionToken, 
                Throwable exception) { 
           // The subscription could not be performed, maybe the user was not 
           // authorized to subscribe on the specified topic e.g. using wildcards 
           Log.i("mqtt", "subscription failed"); 

          } 
         }); 



        } catch (MqttException e) { 
         e.printStackTrace(); 
        } 

//--------------------------------------------------------------------------- 

       } 

       @Override 
       public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 
        // Something went wrong e.g. connection timeout or firewall problems 
        Log.d("mqtt", "onFailure"); 

       } 

      }); 


     } catch (MqttException e) { 
      e.printStackTrace(); 
     } 

    } 


}//End of Activity class 

//----------------------------------------------------------------------------- 

class MqttCallbackHandler implements MqttCallbackExtended { 

    private final MqttAndroidClient client; 

    public MqttCallbackHandler (MqttAndroidClient client) 
    { 
     this.client=client; 
    } 

    @Override 
    public void connectComplete(boolean b, String s) { 
     Log.w("mqtt", s); 
    } 

    @Override 
    public void connectionLost(Throwable throwable) { 

    } 

    public void AlarmActivatedMessageReceived() 
    { 
     MqttMessage msg= new MqttMessage("Hello, the Mosquitto Broker got your message saying that the Alarm is Activated.".getBytes()); 
     try { 
      this.client.publish("Fitlet", msg); 
      Log.i("mqtt", "Message published"); 

     } catch (MqttPersistenceException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 

     } catch (MqttException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { 
     Log.w("mqtt", mqttMessage.toString()); 

     if (mqttMessage.toString().contains("Alarm Activated")) 
     { 
      AlarmActivatedMessageReceived(); 
     } 

    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 

    } 
} 
+0

僅供參考如果您有「正在處理」,那麼MQTT更容易實現:http://processing-mqtt.blogspot.com/。處理代碼在Android手機上運行... – CMP