2013-03-13 45 views
0

我有一個WCF客戶端和使用NetTcp的服務器。服務器位於Windows服務中的ServiceHost中。客戶訂閱WCF服務並註冊其回調接口及其InstanceContext。回調接口有幾個單向方法調用。我把它打開了。監視等待傳輸的TCP消息隊列

這一切都很好。但是,在我的測試中,我的Windows服務中有一段代碼,它通過一個緊密循環,通過單向方法調用之一將消息儘可能快地發送回客戶端。我已經超過了TCP連接傳遞數據的能力,結果是消息排隊。這是我的預期。

現在的問題是:服務器上有任何方法來了解如何備份隊列,以便我可以限制基於實時吞吐量發送消息的速度嗎?

回答

0

我們從來沒有找到答案,但我們創建了我們自己的解決方法,似乎可以做到這一點。爲了完整起見,我會在這裏發佈。我希望它能幫助別人面對類似的情況。

要求:

  1. 我們有一個長期運行的任務,將一個硬件服務器上運行。當我說長時間運行時,我的意思是從一天到很多天。
  2. 我們希望有一個用戶界面,可以在網絡中的任何其他桌面上啓動,以圖形方式查看長時間運行任務的統計數據。
  3. 用戶界面可以多次啓動和停止,並且可以一次執行多個實例。
  4. 用戶界面應該不會對長時間運行的任務產生過度的負擔。運行多個用戶界面不應該減慢速度。

設計:

  1. 的長時間運行的任務包含在一個DLL。有一個帶有run()方法的主類,用於啓動長時間運行的任務。
  2. 我們已經創建了一個Windows服務,它將在硬件服務器上自動運行。
  3. Windows服務將創建主類的實例並通過調用run()方法來啓動任務。
  4. Windows服務還將創建一個ServiceHost實例並啓動一個WCF服務實例。
  5. Windows服務會將主類的引用傳遞給WCF服務。
  6. WCF服務將爲主類可以引發的六個事件創建處理程序。
  7. 從主類到WCF服務的所有通信都是一種方式,通過引發這六個事件。
  8. UI將成爲WCF服務的客戶端,並且連接將與NetTcp綁定。
  9. WCF服務有一個subscribe()方法和一個unsubscribe()方法,以便潛在的UI可以加入和離開。
  10. 當UI調用subscribe()方法時,它將一個唯一標識符作爲字符串傳遞。 WCF服務將標識符及其OperationContext放入ConcurrentDictionary中。
  11. 當UI調用unsubscribe()方法時,將從ConcurrentDictionary中刪除該條目。
  12. UI和WCF服務之間的契約具有從WCF服務到客戶端的長時間運行任務可引發的每種類型事件的單向消息。
  13. 在長時間運行的任務期間引發事件時,WCF服務會處理該事件並遍歷已註冊的UI並向UI發送單向消息。

這一切都是在這一點上工作。

問題:

,因爲我們是壓力測試這個系統,我們創建了一個場景,長時間運行的任務轟炸事件的WCF服務爲快,因爲它可以。這將是最壞的情況,但我們必須能夠處理它。 WCF服務能夠處理事件並將消息放置在Tcp通道上。由於消息是單向的,因此WCF服務不會阻止等待發送的完成,從而使其跟上正在發生的事件。

當用戶界面沒有像服務器推送消息一樣快地將消息拉出時,就會出現問題。消息備份並最終開始超時並導致通道進入故障狀態。我們希望在發生故障之前發現這種情況,以便我們可以開始丟棄消息。不幸的是,我們無法找到檢測此頻道積壓的機制。如果我們將消息更改爲雙向,則WCF服務會阻塞,直到消息完成並且通道不會備份爲止,但這會影響長時間運行的服務並降低速度。不好。

解決方案:

我們在其中包含了長期運行的任務同一個DLL創建一類特殊的解決了這個問題。這個類負責傳回任何連接的用戶界面。該通信對象包含每個要提出的事件的ConcurrentQueue。當長時間運行的任務通常會將事件提升回WCF服務時,它現在會調用此通信對象中的方法。

在此方法中,通信對象會將事件參數輸入到該事件的ConcurrentQueue中。通信對象還有一個方法,該方法在創建對象時在單獨的線程上啓動。這個新方法將持續循環並行Queues並彈出事件參數並實際提升事件。我們將NetTcp調用改爲雙向,因此線程中的例程將綁定到TCP通道的速度,但由於它位於單獨的線程中,因此不會減慢長時間運行任務的主要處理速度。

既然我們有一個ConcurrentQueue,我們可以把握,我們可以檢查積壓。我們有一些限制(在目前的情況下是10),我們邏輯上爲併發隊列設置了限制。當長時間運行的任務調用將事件參數添加到隊列的方法時,它首先檢查隊列的計數,並且如果它小於我們的邏輯限制,它將排隊事件參數,否則它會簡單地丟棄並繼續。這樣長時間運行的隊列的速度不會受到影響,WCF服務將不會備份並導致故障通道狀態。

總結:

我們歡迎任何意見或替代的想法。這似乎對我們來說工作得很好,似乎是有彈性的。

class UI 
{ 
    #region Class Scoped Variables 
    private Int32 _threashold = 10; 
    private bool _continue = true; 
    #endregion Class Scoped Variables 

    #region Public Delegate Definitions 
    public delegate void OnPlanSelectionChangedDelegate(PlanSelectionChangedEventArgs e); 
    // other lines deleted for brevity 
    #endregion Public Delegate Definitions 

    #region Local Delegate Instances 
    private OnPlanSelectionChangedDelegate _onPlanSelectionChangedDelegate = null; 
    // other lines deleted for brevity 
    #endregion Local Delegate Instances 

    #region Local Queues for Delegates 
    private ConcurrentQueue<PlanSelectionChangedEventArgs> _planSelectionChangedQueue 
     = new ConcurrentQueue<PlanSelectionChangedEventArgs>(); 
    // other lines deleted for brevity 
    #endregion Local Queues for Delegates 

    #region Constructor 
    public UI(OnPlanSelectionChangedDelegate onPlanSelectionChanged) 
    { 
     _onPlanSelectionChangedDelegate = onPlanSelectionChanged; 
     // other lines deleted for brevity 
     ThreadPool.QueueUserWorkItem(new WaitCallback(DoWork), null); 
    } 
    #endregion Constructor 

    #region Public Methods 
    public void Shutdown() 
    { 
     _continue = false; 
    } 
    public void SendPlanSelection(PlanSelectionChangedEventArgs e) 
    { 
     if (_planSelectionChangedQueue.Count < _threashold) 
     { 
      if (_cntPlanSelectionDropped > 0) 
      { 
       e.Dropped = _cntPlanSelectionDropped; 
      } 
      _planSelectionChangedQueue.Enqueue(e); 
      _cntPlanSelectionDropped = 0; 
     } 
     else 
     { 
      _cntPlanSelectionDropped++; 
     } 
    } 
    // other lines deleted for brevity 
    #endregion Public Methods 

    #region Private Asychronous Method 
    private void DoWork(object dummy) 
    { 
     PlanSelectionChangedEventArgs planSelectionChangedEventArgs = null; 
     while (_continue) // process this loop until told to quit 
     { 
      // Plan Selection Changed 
      // Try to get the next event args in a thread safe way 
      if (_planSelectionChangedQueue.TryDequeue(out planSelectionChangedEventArgs)) 
      { 
       // We got an event args from the queue, do we have a valid delegate? 
       if (_onPlanSelectionChangedDelegate != null) 
       { 
        // We have a delegate, call it with the event args and rais the event 
        _onPlanSelectionChangedDelegate(planSelectionChangedEventArgs); 
       } 
      } 

      // other lines deleted for brevity 
     } 
    } 
    #endregion Private Asychronous Method 
}