2014-09-10 50 views
0

The basic RabbitMQ tutorial給出瞭如何從隊列中不斷獲取消息的示例:在C#中,如何處理當前隊列中的所有RabbitMQ消息?

var factory = new ConnectionFactory() { HostName = "localhost" }; 
using (var connection = factory.CreateConnection()) 
{ 
    using (var channel = connection.CreateModel()) 
    { 
     channel.QueueDeclare("hello", false, false, false, null); 

     var consumer = new QueueingBasicConsumer(channel); 
     channel.BasicConsume("hello", true, consumer); 

     Console.WriteLine(" [*] Waiting for messages." + 
           "To exit press CTRL+C"); 
     while (true) 
     { 
      var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 

      var body = ea.Body; 
      var message = Encoding.UTF8.GetString(body); 
      Console.WriteLine(" [x] Received {0}", message); 
     } 
    } 
} 

我想要做的就是找回已被放置到隊列,然後停止所有消息。

這裏是如果我下午一點開始我的代碼,將解決我的問題

  1. 兩個例子,我想處理所有已放置到隊列前下午1點的消息。

OR

  • 如果我在13:00:00開始我的代碼,它需要10秒鐘,我的代碼運行,我不介意,如果它包括在13:00:00和13:00:10之間放置在隊列中的消息,只要隊列一空就停止。
  • 我意識到我可能會在我的郵件中添加一個時間戳並檢查該郵件,或者我可以擺弄超時值,但我想知道是否有任何內置方法可以正確執行此操作。

    在此先感謝。

    +2

    從你的描述,它聽起來就像排隊是不正確的工具,隊列爲了緩衝消息,使得它們的生成速度比它們可以消耗的速度更快,隊列不會被設計爲在你決定處理它們之前持續消息。儘管RabitMQ支持持久隊列,但這更多的是容錯。您應該始終在隊列中立即處理消息。如果您稍後想要對消息進行處理,則可以從某個持久性數據存儲庫(如數據庫)中保存並檢索它們。 – 2014-09-10 10:59:09

    +0

    感謝您的評論,本。 – 2014-09-10 11:04:36

    +0

    它聽起來像一個MQ不是你的問題的解決方案。你可以做的是創建一個名爲processQueue的布爾變量並將其設置爲true。然後,進行處理並將該變量更新爲false。 while循環將改爲:while(processQueue){// ...} – 2014-09-10 11:10:41

    回答

    3

    從評論的集成功能似乎的RabbitMQ並不意味着批量處理,所以它並沒有被設計用於此目的。

    我也注意到,當測試DequeueNoWait方法或嘗試以零超時進行Dequeue時,它們根本不起作用,只是返回null。

    以下解決方案使用QueueDeclare得到現有消息的計數,並且不需要時間戳或哈克超時:

    var factory = new ConnectionFactory { HostName = "localhost" }; 
    using (var connection = factory.CreateConnection()) 
    { 
        using (var channel = connection.CreateModel()) 
        { 
         var queueDeclareResponse = channel.QueueDeclare(Constants.QueueName, false, false, false, null); 
    
         var consumer = new QueueingBasicConsumer(channel); 
         channel.BasicConsume(Constants.QueueName, true, consumer); 
    
         Console.WriteLine(" [*] Processing existing messages."); 
    
         for (int i = 0; i < queueDeclareResponse.MessageCount; i++) 
         { 
          var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
          var body = ea.Body; 
          var message = Encoding.UTF8.GetString(body); 
          Console.WriteLine(" [x] Received {0}", message); 
         } 
         Console.WriteLine("Finished processing {0} messages.", queueDeclareResponse.MessageCount); 
         Console.ReadLine(); 
        } 
    } 
    
    +0

    不要使用'QueueingBasicConsumer'它沒有內置的恢復,使用EventingConsumer – PUG 2015-08-18 19:45:25

    1

    你可以做什麼:

    1)首先時間戳添加到消息對象

    2)拒絕的消息,如果時間戳是無效的。它的RabbitMQ

    Reference

    +0

    謝謝你的回答,EvilFonti。我在我的問題中說過,「我意識到我可以在我的消息中加上時間戳並檢查它,或者我可以擺弄超時值,但我想知道是否有任何內置的方法來正確執行此操作。」但出於興趣,用什麼方法來獲取使用索引的元素? – 2014-09-10 12:41:55

    +0

    我很抱歉,我以爲我用了一個索引來訪問rabbitmq中的消息(也許這是MSMQ的情況,但我不知道了......)。那是不可能的,但你可以拒絕它 - >我編輯了答案。 – EvilFonti 2014-09-10 13:09:22

    +0

    如果在應用程序啓動後沒有消息發送一段時間,則此解決方案意味着必須等待超時。我想這意味着人們需要設置一個短暫的超時時間,但足夠長的時間來檢索現有的消息。 – 2014-09-10 13:30:35

    相關問題