我們從來沒有找到答案,但我們創建了我們自己的解決方法,似乎可以做到這一點。爲了完整起見,我會在這裏發佈。我希望它能幫助別人面對類似的情況。
要求:
- 我們有一個長期運行的任務,將一個硬件服務器上運行。當我說長時間運行時,我的意思是從一天到很多天。
- 我們希望有一個用戶界面,可以在網絡中的任何其他桌面上啓動,以圖形方式查看長時間運行任務的統計數據。
- 用戶界面可以多次啓動和停止,並且可以一次執行多個實例。
- 用戶界面應該不會對長時間運行的任務產生過度的負擔。運行多個用戶界面不應該減慢速度。
設計:
- 的長時間運行的任務包含在一個DLL。有一個帶有run()方法的主類,用於啓動長時間運行的任務。
- 我們已經創建了一個Windows服務,它將在硬件服務器上自動運行。
- Windows服務將創建主類的實例並通過調用run()方法來啓動任務。
- Windows服務還將創建一個ServiceHost實例並啓動一個WCF服務實例。
- Windows服務會將主類的引用傳遞給WCF服務。
- WCF服務將爲主類可以引發的六個事件創建處理程序。
- 從主類到WCF服務的所有通信都是一種方式,通過引發這六個事件。
- UI將成爲WCF服務的客戶端,並且連接將與NetTcp綁定。
- WCF服務有一個subscribe()方法和一個unsubscribe()方法,以便潛在的UI可以加入和離開。
- 當UI調用subscribe()方法時,它將一個唯一標識符作爲字符串傳遞。 WCF服務將標識符及其OperationContext放入ConcurrentDictionary中。
- 當UI調用unsubscribe()方法時,將從ConcurrentDictionary中刪除該條目。
- UI和WCF服務之間的契約具有從WCF服務到客戶端的長時間運行任務可引發的每種類型事件的單向消息。
- 在長時間運行的任務期間引發事件時,WCF服務會處理該事件並遍歷已註冊的UI並向UI發送單向消息。
這一切都是在這一點上工作。
問題:
,因爲我們是壓力測試這個系統,我們創建了一個場景,長時間運行的任務轟炸事件的WCF服務爲快,因爲它可以。這將是最壞的情況,但我們必須能夠處理它。 WCF服務能夠處理事件並將消息放置在Tcp通道上。由於消息是單向的,因此WCF服務不會阻止等待發送的完成,從而使其跟上正在發生的事件。
當用戶界面沒有像服務器推送消息一樣快地將消息拉出時,就會出現問題。消息備份並最終開始超時並導致通道進入故障狀態。我們希望在發生故障之前發現這種情況,以便我們可以開始丟棄消息。不幸的是,我們無法找到檢測此頻道積壓的機制。如果我們將消息更改爲雙向,則WCF服務會阻塞,直到消息完成並且通道不會備份爲止,但這會影響長時間運行的服務並降低速度。不好。
解決方案:
我們在其中包含了長期運行的任務同一個DLL創建一類特殊的解決了這個問題。這個類負責傳回任何連接的用戶界面。該通信對象包含每個要提出的事件的ConcurrentQueue。當長時間運行的任務通常會將事件提升回WCF服務時,它現在會調用此通信對象中的方法。
在此方法中,通信對象會將事件參數輸入到該事件的ConcurrentQueue中。通信對象還有一個方法,該方法在創建對象時在單獨的線程上啓動。這個新方法將持續循環並行Queues並彈出事件參數並實際提升事件。我們將NetTcp調用改爲雙向,因此線程中的例程將綁定到TCP通道的速度,但由於它位於單獨的線程中,因此不會減慢長時間運行任務的主要處理速度。
既然我們有一個ConcurrentQueue,我們可以把握,我們可以檢查積壓。我們有一些限制(在目前的情況下是10),我們邏輯上爲併發隊列設置了限制。當長時間運行的任務調用將事件參數添加到隊列的方法時,它首先檢查隊列的計數,並且如果它小於我們的邏輯限制,它將排隊事件參數,否則它會簡單地丟棄並繼續。這樣長時間運行的隊列的速度不會受到影響,WCF服務將不會備份並導致故障通道狀態。
總結:
我們歡迎任何意見或替代的想法。這似乎對我們來說工作得很好,似乎是有彈性的。
class UI
{
#region Class Scoped Variables
private Int32 _threashold = 10;
private bool _continue = true;
#endregion Class Scoped Variables
#region Public Delegate Definitions
public delegate void OnPlanSelectionChangedDelegate(PlanSelectionChangedEventArgs e);
// other lines deleted for brevity
#endregion Public Delegate Definitions
#region Local Delegate Instances
private OnPlanSelectionChangedDelegate _onPlanSelectionChangedDelegate = null;
// other lines deleted for brevity
#endregion Local Delegate Instances
#region Local Queues for Delegates
private ConcurrentQueue<PlanSelectionChangedEventArgs> _planSelectionChangedQueue
= new ConcurrentQueue<PlanSelectionChangedEventArgs>();
// other lines deleted for brevity
#endregion Local Queues for Delegates
#region Constructor
public UI(OnPlanSelectionChangedDelegate onPlanSelectionChanged)
{
_onPlanSelectionChangedDelegate = onPlanSelectionChanged;
// other lines deleted for brevity
ThreadPool.QueueUserWorkItem(new WaitCallback(DoWork), null);
}
#endregion Constructor
#region Public Methods
public void Shutdown()
{
_continue = false;
}
public void SendPlanSelection(PlanSelectionChangedEventArgs e)
{
if (_planSelectionChangedQueue.Count < _threashold)
{
if (_cntPlanSelectionDropped > 0)
{
e.Dropped = _cntPlanSelectionDropped;
}
_planSelectionChangedQueue.Enqueue(e);
_cntPlanSelectionDropped = 0;
}
else
{
_cntPlanSelectionDropped++;
}
}
// other lines deleted for brevity
#endregion Public Methods
#region Private Asychronous Method
private void DoWork(object dummy)
{
PlanSelectionChangedEventArgs planSelectionChangedEventArgs = null;
while (_continue) // process this loop until told to quit
{
// Plan Selection Changed
// Try to get the next event args in a thread safe way
if (_planSelectionChangedQueue.TryDequeue(out planSelectionChangedEventArgs))
{
// We got an event args from the queue, do we have a valid delegate?
if (_onPlanSelectionChangedDelegate != null)
{
// We have a delegate, call it with the event args and rais the event
_onPlanSelectionChangedDelegate(planSelectionChangedEventArgs);
}
}
// other lines deleted for brevity
}
}
#endregion Private Asychronous Method
}