2010-09-21 267 views
8

我想發送消息給RabbitMQ服務器,然後等待回覆消息(在「回覆」隊列中)。當然,我不想永遠等待處理這些消息的應用程序停機 - 這需要超時。這聽起來像是一項非常基本的任務,但我找不到辦法做到這一點。我現在用Java API遇到了這個問題。RabbitMQ等待超時消息

回答

0

com.rabbitmq.client.QueueingConsumernextDelivery(long timeout)方法,它會做你想要什麼。但是,這已被棄用。 編寫你自己的超時並不難,儘管有一個正在進行的線程和一個實時標識符列表可能會更好,而不是始終添加和刪除使用者和關聯的超時線程。

修改即可添加:回覆後注意日期!

0

我使用C#通過創建一個對象跟蹤對特定消息的響應來處理此問題。它爲消息設置一個唯一的回覆隊列,並訂閱它。如果在指定的時間內未收到響應,則倒數計時器會取消訂閱,這會刪除隊列。另外,我擁有可以從我的主線程(使用信號量)或異步(使用回調)同步的方法來利用此功能。

基本上,實施看起來是這樣的:

//Synchronous case: 
//Throws TimeoutException if timeout happens 
var msg = messageClient.SendAndWait(theMessage); 

//Asynchronous case 
//myCallback receives an exception message if there is a timeout 
messageClient.SendAndCallback(theMessage, myCallback); 
2

的RabbitMQ的Java客戶端庫現在supports a timeout argument to its QueueConsumer.nextDelivery() method

例如,RPC教程使用下面的代碼:

channel.basicPublish("", requestQueueName, props, message.getBytes()); 

while (true) { 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    if (delivery.getProperties().getCorrelationId().equals(corrId)) { 
     response = new String(delivery.getBody()); 
     break; 
    } 
} 

現在,你可以使用consumer.nextDelivery(1000)等待最大一秒鐘。如果達到超時,則該方法返回null

channel.basicPublish("", requestQueueName, props, message.getBytes()); 

while (true) { 
    // Use a timeout of 1000 milliseconds 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000); 

    // Test if delivery is null, meaning the timeout was reached. 
    if (delivery != null && 
     delivery.getProperties().getCorrelationId().equals(corrId)) { 
     response = new String(delivery.getBody()); 
     break; 
    } 
}