2017-05-22 44 views
9

RabbitMQ Wait for a message with a timeoutWait for a single RabbitMQ message with a timeout中的解決方案似乎不起作用,因爲官方C#庫中沒有下一個傳遞方法,並且QueueingBasicConsumer被刪除,所以它只是在任何地方拋出NotSupportedException異常。C#RabbitMQ爲指定的超時等待一條消息?

我如何等待隊列中的單個消息指定超時?

PS

它可以通過Basic.Get(),是的,但很好,這是不好解決拉specififed間隔消息(超額流量,多餘的CPU)來完成。

更新

EventingBasicConsumer通過implmenetation不支持立即取消。即使您在某個時候調用BasicCancel,即使您指定通過BasicQos進行預取,它仍將在中取回,並且這些幀可以包含多個消息。所以,單個任務執行不好。不要打擾 - 它不適用於單條消息。

回答

5

有很多方法可以做到這一點。例如,您可以用ManualResetEvent一起使用EventingBasicConsumer,像這樣的(這只是用於演示目的 - 更好地利用以下方法之一):

var factory = new ConnectionFactory(); 
using (var connection = factory.CreateConnection()) { 
    using (var channel = connection.CreateModel()) { 
     // setup signal 
     using (var signal = new ManualResetEvent(false)) { 
      var consumer = new EventingBasicConsumer(channel); 
      byte[] messageBody = null;       
      consumer.Received += (sender, args) => { 
       messageBody = args.Body; 
       // process your message or store for later 
       // set signal 
       signal.Set(); 
      };    
      // start consuming 
      channel.BasicConsume("your.queue", false, consumer); 
      // wait until message is received or timeout reached 
      bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10)); 
      // cancel subscription 
      channel.BasicCancel(consumer.ConsumerTag); 
      if (timeout) { 
       // timeout reached - do what you need in this case 
       throw new Exception("timeout"); 
      } 

      // at this point messageBody is received 
     } 
    } 
} 

正如你在註釋中規定 - 如果你希望在同一個隊列多個消息,這不是最好的方法。那麼這不是最好的方法,我只是爲了演示ManualResetEvent在案例庫本身不提供超時支持的情況下使用它。

如果您正在進行RPC(遠程過程調用,請求 - 答覆) - 您可以在服務器端使用SimpleRpcClientSimpleRpcServer。客戶端看起來就像這樣:

var client = new SimpleRpcClient(channel, "your.queue"); 
client.TimeoutMilliseconds = 10 * 1000; 
client.TimedOut += (sender, args) => { 
    // do something on timeout 
};      
var reply = client.Call(myMessage); // will return reply or null if timeout reached 

更簡單的方法:使用基本Subscription類(它使用內部同樣EventingBasicConsumer,但支持超時,所以你不必自己實現),像這樣:

var sub = new Subscription(channel, "your.queue"); 
BasicDeliverEventArgs reply; 
if (!sub.Next(10 * 1000, out reply)) { 
    // timeout 
} 
+0

第一個解決方案無效。 BasicConsume不能保證在BasicCancel上停止使用,它可以稍後做這個,因爲它實際上是在執行rabbit(只是嘗試每個請求使用一條消息,並且在某些情況下,您會看到幾次分配messageBody)。您仍然需要重發多餘的消息。對於第二個和第三個,我會嘗試現在=) – eocron

+0

雖然你的上半部分是無關的,訂閱類正是我想要的!謝謝,它的作品奇妙!你能編輯答案,以便其他人知道最後一個解決了嗎? – eocron

+0

不過,它通過實現存儲了一堆消息,而我只需要一個=/ – eocron