0
我嘗試使用在我的一個Android應用程序中使用AMQP消息隊列。我首先在java應用程序中測試了代碼,並且沒有任何問題,但是當我將相同的代碼放入Android應用程序時,沒有發生錯誤,但AMQP通道在收到第一條消息後意外關閉。我試圖添加一個關閉監聽器的通道,但它沒有返回任何東西。有人能幫我弄清楚爲什麼Rabbitmq頻道正在關閉?Android與AMQP
// ------------------------------------------------------------------------
// Function to make AMQP connection and subscribe
// ------------------------------------------------------------------------
int connAmqp()
{
factory = new ConnectionFactory();
factory.setHost(PI_AMQP_BROKER_URL);
factory.setUsername(AMQP_BROKER_USERNAME);
factory.setPassword(AMQP_BROKER_PASSWORD);
factory.setPort(AMQP_BROKER_PORT);
connection=null;
connectionStatus = AMQP_CONNECTING;
try {
connection = factory.newConnection();
Log.i("log_amqp_conn","Successfully connected to AMQP broker");
connectionStatus = AMQP_CONNECTED;
channel = connection.createChannel();
channel.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
Log.w("log_amqp_shutdown",cause.getCause().toString());
}
});
channel.exchangeDeclare(AMQP_EXCHANGE_NAME, "topic");
Log.i("log_amqp_conn","Successfully connected to Exchange: " + AMQP_EXCHANGE_NAME);
channel.queueDeclare(RX_QUEUE_NAME, false, false, false, null);
String queueBind = RX_BINDING;
Log.i("log_amqp_conn","Successfully declared queue: " + RX_QUEUE_NAME);
channel.queueBind(RX_QUEUE_NAME, AMQP_EXCHANGE_NAME, queueBind);
Log.i("log_amqp_conn","Successfully binding to: " + queueBind);
} catch (TimeoutException e) {
Log.i("log_amqp_conn","Connection timeount - Failed to connect to AMQP broker");
Log.i("log_amqp_conn",e.toString());
connectionStatus = AMQP_NOTCONNECTED_TIMEOUT;
} catch (IOException e) {
Log.i("log_amqp_conn","Failed to connect to AMQP broker");
Log.i("log_amqp_conn",e.toString());
connectionStatus = AMQP_NOTCONNECTED_UNKNOWNREASON;
}catch (Exception e) {
Log.i("log_amqp_conn","Failed to connect to AMQP broker");
Log.i("log_amqp_conn",e.toString());
connectionStatus = AMQP_NOTCONNECTED_UNKNOWNREASON;
}
Log.i("log_amqp_conn"," [*] Waiting for messages.");
consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
Log.i("log_amqp_conn", "handleDelivery() called");
String message = new String(body, "UTF-8");
Log.i("log_amqp_conn"," [Rx] Received: " + message);
//DO SOME WORK HERE
}
};
try {
Log.i("log_amqp_conn", "basicConsume() called");
channel.basicConsume(RX_QUEUE_NAME, true, consumer);
} catch (IOException e) {
Log.e("log_amqp_consume",e.toString());
}
return 0;
}