在我的應用程序中,我必須偵聽多個不同的隊列並反序列化/調度在隊列上接收到的傳入消息。用任務替換無限線程循環(消息泵)
實際上,我在做什麼來實現這一目標是每個QueueConnector對象在構造時創建一個新線程,該線程通過對queue.Receive()的阻塞調用執行一個無限循環,以接收隊列中的下一條消息,下面的代碼:
// Instantiate message pump thread
msmqPumpThread = new Thread(() => while (true)
{
// Blocking call (infinite timeout)
// Wait for a new message to come in queue and get it
var message = queue.Receive();
// Deserialize/Dispatch message
DeserializeAndDispatchMessage(message);
}).Start();
我想知道,如果這個「消息泵」,可以使用任務(一個或多個)所取代,而不是通過一個新的線程無限循環下去。
我已經爲消息接收部分做了一個任務(見下面),但我真的不知道如何將它用於消息泵(我可以一遍又一遍記得一遍又一遍地完成同一個任務,在代碼替換單獨的線程無限循環如上?)
Task<Message> GetMessageFromQueueAsync()
{
var tcs = new TaskCompletionSource<Message>();
ReceiveCompletedEventHandler receiveCompletedHandler = null;
receiveCompletedHandler = (s, e) =>
{
queue.ReceiveCompleted -= receiveCompletedHandler;
tcs.SetResult(e.Message);
};
queue.BeginReceive();
return tcs.Task;
}
我將獲得通過使用任務而不是無限循環的任何一個單獨的線程(在此上下文中,阻塞調用=>阻塞線程)?如果是的話,如何正確地做到這一點?
請注意,此應用程序沒有很多QueueConnector對象,並且不會有(可能有10個連接器MAX),這意味着通過第一個解決方案的最大線程數爲10,因此內存佔用量/性能啓動線程不是問題在這裏。我寧願考慮調度性能/ CPU使用率。會有什麼區別嗎?
這是一個tipycal生產者 - 消費者場景,可以使用['BlockingCollection'](http://msdn.microsoft.com/en-us/library/dd267312.aspx)解決,特別是使用['BlockingCollection。GetConsumingEnumerable'](http://msdn.microsoft.com/en-us/library/dd287186.aspx) –
2012-07-05 18:26:49
感謝Paolo! 但據我所知,生產者/消費者隊列更適合於計算綁定任務(執行密集計算),而TaskCompletionSource /異步函數更適合於I/O綁定任務(等待發生的事情)。 由於我的問題處理I/O綁定任務(等待消息進入隊列),我認爲TaskCompletionSource會更加合適。然而,我可能是錯的。 – darkey 2012-07-05 21:29:56