2017-09-18 157 views
0

我嘗試使用在我的一個Android應用程序中使用AMQP消息隊列。我首先在java應用程序中測試了代碼,並且沒有任何問題,但是當我將相同的代碼放入Android應用程序時,沒有發生錯誤,但AMQP通道在收到第一條消息後意外關閉。我試圖添加一個關閉監聽器的通道,但它沒有返回任何東西。有人能幫我弄清楚爲什麼Rabbitmq頻道正在關閉?Android與AMQP

// ------------------------------------------------------------------------ 
// Function to make AMQP connection and subscribe 
// ------------------------------------------------------------------------ 
int connAmqp() 
{ 
    factory = new ConnectionFactory(); 
    factory.setHost(PI_AMQP_BROKER_URL); 
    factory.setUsername(AMQP_BROKER_USERNAME); 
    factory.setPassword(AMQP_BROKER_PASSWORD); 
    factory.setPort(AMQP_BROKER_PORT); 
    connection=null; 
    connectionStatus = AMQP_CONNECTING; 
    try { 

     connection = factory.newConnection(); 
     Log.i("log_amqp_conn","Successfully connected to AMQP broker"); 
     connectionStatus = AMQP_CONNECTED; 
     channel = connection.createChannel(); 

     channel.addShutdownListener(new ShutdownListener() { 
      @Override 
      public void shutdownCompleted(ShutdownSignalException cause) { 
       Log.w("log_amqp_shutdown",cause.getCause().toString()); 
      } 
     }); 

     channel.exchangeDeclare(AMQP_EXCHANGE_NAME, "topic"); 
     Log.i("log_amqp_conn","Successfully connected to Exchange: " + AMQP_EXCHANGE_NAME); 
     channel.queueDeclare(RX_QUEUE_NAME, false, false, false, null); 
     String queueBind = RX_BINDING; 
     Log.i("log_amqp_conn","Successfully declared queue: " + RX_QUEUE_NAME); 
     channel.queueBind(RX_QUEUE_NAME, AMQP_EXCHANGE_NAME, queueBind); 
     Log.i("log_amqp_conn","Successfully binding to: " + queueBind); 
    } catch (TimeoutException e) { 
     Log.i("log_amqp_conn","Connection timeount - Failed to connect to AMQP broker"); 
     Log.i("log_amqp_conn",e.toString()); 
     connectionStatus = AMQP_NOTCONNECTED_TIMEOUT; 
    } catch (IOException e) { 
     Log.i("log_amqp_conn","Failed to connect to AMQP broker"); 
     Log.i("log_amqp_conn",e.toString()); 
     connectionStatus = AMQP_NOTCONNECTED_UNKNOWNREASON; 
    }catch (Exception e) { 
     Log.i("log_amqp_conn","Failed to connect to AMQP broker"); 
     Log.i("log_amqp_conn",e.toString()); 
     connectionStatus = AMQP_NOTCONNECTED_UNKNOWNREASON; 
    } 

    Log.i("log_amqp_conn"," [*] Waiting for messages."); 

    consumer = new DefaultConsumer(channel) { 
     @Override 
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 
       throws IOException { 
      Log.i("log_amqp_conn", "handleDelivery() called"); 
      String message = new String(body, "UTF-8"); 
      Log.i("log_amqp_conn"," [Rx] Received: " + message); 

      //DO SOME WORK HERE 

     } 
    }; 
    try { 
     Log.i("log_amqp_conn", "basicConsume() called"); 
     channel.basicConsume(RX_QUEUE_NAME, true, consumer); 
    } catch (IOException e) { 
     Log.e("log_amqp_consume",e.toString()); 
    } 

    return 0; 
} 

回答

0

花費太多的時間在找錯了地方,我發現通道被關閉,由於在「//做一些工作HERE」代碼handleDelivery()函數未處理的異常後。事實證明,如果這種方法錯誤,AMQP通道將關閉。謝謝。