2011-10-08 74 views
2

我有這個問題,其中一個系統包含推動要處理的消息的節點(windows服務),以及其他推送消息並處理它們的節點。Windows服務通過MSMQ進行通信 - 我需要服務總線嗎?

設計時,推送節點通過在每次發送後維護一個循環隊列列表和循環隊列來平衡隊列之間的負載。因此,消息1將進入隊列1,消息2到隊列2等。到目前爲止,該部分工作得很好。

在消息的拉尾處,我們設計了這樣一個消息,即以類似的方式檢索消息 - 首先從隊列1,然後從隊列2等。理論上,每個拉節點位於不同的機器上,並且在實踐中很遠,它只在一個隊列中進行監聽。但最近的一項要求讓我們在一臺偵聽多個隊列的計算機上有一個拉節點:一個通常非常繁忙,充滿了數百萬條消息,而另一條通常只包含少量消息。

我們面臨的問題是,我們最初構建的拉節點的方式是從隊列到隊列,直到找到消息。如果它超時(例如在一秒之後),則它移動到下一個隊列。

這不再工作原因Q1(充滿了數百萬條消息)每條消息將被延遲大約一秒,因爲從每次從Q1拉出後,我們會向Q2詢問一條消息(如果它不包含任何消息,我們將等待第二)。

因此,它是這樣的:

Q1包含10個消息和Q2不含有

  • 拉節點請求從Q1
  • Q1一個消息返回消息立即
  • 拉節點請求來自Q1的消息
  • ------------等待一秒-------------(Q2爲空且請求超時)
  • 拉節點請求消息從Q1
  • Q1返回消息立即
  • 拉節點從Q1
  • 詢問消息------------等待一個第二---- ---------(Q2是空的,請求超時)

因此,這顯然是錯誤的。

我想我在這裏尋找最好的建築解決方案。消息處理不需要儘可能實時,但需要健壯,並且不應丟失任何消息!

我想聽聽您對這個問題的看法。

預先感謝 雅尼斯

+0

你可以避免所有的麻煩,每個隊列使用一個處理線程,從而使每個隊列處理獨立於其他... – Yahia

+0

Yahia我已經結束了做你喜歡你的建議。如果有人需要,我會發布代碼以備將來參考 – Yannis

回答

1

我最終創建了一組線程 - 每個msmq需要處理一個線程。在構造我初始化那些線程:

Storages.ForEach(queue => 
     { 
      Task task = Task.Factory.StartNew(() => 
      { 
       LoggingManager.LogInfo("Starting a local thread to read in mime messages from queue " + queue.Name, this.GetType()); 
       while (true) 
       { 
        WorkItem mime = queue.WaitAndRetrieve(); 
        if (mime != null) 
        { 
         _Semaphore.WaitOne(); 
         _LocalStorage.Enqueue(mime); 

         lock (_locker) Monitor.Pulse(_locker); 

         LoggingManager.LogDebug("Adding no. " + _LocalStorage.Count + " item in queue", this.GetType()); 
        } 
       } 
      }); 
     }); 
  • 的_LocalStorage是線程安全Queue實現(ConcurrentQueue引入在.NET 4.0)

  • 信號量是一個計數信號以控制插入在_LocalStorage。 _LocalStorage基本上是收到消息的緩衝區,但我們不希望它在處理節點忙於工作時變得太大。其效果可能是我們檢索到_LocalStorage中的所有msmq消息,但正忙於處理其中的5個左右。這對於彈性(如果程序意外終止,我們會丟失所有這些消息)以及在性能方面都很差,因爲將所有這些項目保存在內存中的內存消耗將非常巨大。所以我們需要控制我們在_LocalStorage緩衝區隊列中保存的項目數量。

  • 我們脈衝線程等待工作(見下文),一個新的項目加入到隊列中做一個簡單的Monitor.Pulse

,從隊列中出隊的工作項目的代碼如下:

lock (_locker) 
      if (_LocalStorage.Count == 0) 
       Monitor.Wait(_locker); 

     WorkItem result; 
     if (_LocalStorage.TryDequeue(out result)) 
     { 
      _Semaphore.Release(); 
      return result; 
     } 

     return null; 

我希望這可以幫助某人解決類似的問題。

1

也許你可以使用MessageQueue類的ReceiveCompleted事件?不需要再輪詢。