2011-11-18 38 views
3

使用RabbitMQ有一種使用方法,類似於MSSMQ,可以從隊列中彈出1000條消息,然後將其插入數據庫並從那裏繼續。RabbitMQ和C#

我似乎無法做到這一點與一個渠道的訂閱,然後對訂閱中的BasicDeliveryEventArgs做一個foreach,用這個if語句在給定的時間處理我想要處理的最大消息數。

預先感謝 然而,這仍然需要從隊列

using (IConnection connection = factory.CreateConnection()) 
{ 
    using (IModel channel = connection.CreateModel()) 
    { 
     channel.QueueDeclare("****", true, false, false, null); 

     var subscription = new Subscription(channel, "****", false); 
     int maxMessages = 5; 
     int i = 0; 
     foreach (BasicDeliverEventArgs eventArgs in subscription) 
     { 
      if (++i == maxMessages) 
      { 
       Console.WriteLine("Took 5 messages"); 
       subscription.Ack(eventArgs); 
       break; 
      } 
     } 
    } 
} 
+1

我不明白這個問題,特別是「與那做一個if語句」部分。你能澄清一下嗎? –

+0

更新了帖子 – user1053237

+0

對我仍然沒有意義。你想達到什麼目的? –

回答

9

我假設你想通過他們的羣體配料成較大的交易,而優化消息加載到數據庫中的所有22K的消息而不是每個消息的交易費用。隨着強制性的警告,這樣做意味着信息的大型團體可以一起失敗,即使只有其中一個會導致一個問題,這裏是你怎麼會去一下......

QOS設置在通道上:

channel.BasicQos(0, 1000, false); 

這將預取1000條消息並阻止進一步的流量,直到您確認某事。請注意,它不會以1000個塊爲單位獲取。相反,它確保最多可以在任何時候預取最多1000個UNACK'ed消息。模擬塊傳輸與首先處理1000條消息一樣簡單,然後一次確認全部消息。

請參閱herehere以獲取比我更權威的解釋。

還有一點:即使您還沒有創建1000條消息的配額,只要有消息可用,您可能需要刷新隊列。您應該可以通過在foreach循環內調用queue.BasicGet()直到它乾涸,然後將所有內容(包括從subscription中提取的消息)發送到數據庫來完成此操作。警告:我自己沒有嘗試過,所以我可能會說垃圾,但我認爲它會起作用。這種方法的優點在於它可以立即將消息推入數據庫,而無需等待一整批1000條消息。如果數據庫落後於處理太多小事務,預取積壓將在每個週期之間更多地填滿。