我試圖實現一個相當簡單的生產者/消費者風格的應用程序與多個生產者和一個消費者。生產者/消費者模式與批生產者
研究使我到BlockingCollection<T>
這是有用的,讓我實現如下一個長期運行的消費者任務:
var c1 = Task.Factory.StartNew(() =>
{
var buffer = new List<int>(BATCH_BUFFER_SIZE);
foreach (var value in blockingCollection.GetConsumingEnumerable())
{
buffer.Add(value);
if (buffer.Count == BATCH_BUFFER_SIZE)
{
ProcessItems(buffer);
buffer.Clear();
}
}
});
的ProcessItems
函數提交緩衝到數據庫和它在批量工作。然而,這個解決方案是次優的。在男爵製作期間,可能需要一段時間才能填充緩衝區,這意味着數據庫已過期。
更理想的解決方案是在30秒計時器上運行該任務或用超時短接foreach
。
我跑了定時器的想法,並與此想出了:
syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000);
private static void TimerElapsed(object state)
{
var buffer = new List<int>();
var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable();
foreach (var value in collection)
{
buffer.Add(value);
}
ProcessItems(buffer);
buffer.Clear();
}
這具有foreach
將被阻止,直到最後,擊敗定時器的目的明確的問題。
任何人都可以提供一個方向嗎?我基本上需要定期對BlockingCollection
進行快照,並處理內容。也許BlockingCollection
是錯誤的類型?
我這個實現這完美的作品去了。它每隔x秒運行一次,只推送可用的(不等待)。謝謝你的幫助。 https://gist.github.com/AntSwift/d2faa4f43d1a6d594172 –
一個完整的工作版本現在可在https://github.com/AntSwift/AS.BlockingCollectionDemo –
我相信你的代碼示例會受到競爭條件的影響,因爲定時器機制。如果您的消費者無法在計時器期限內完成任務,則計時器將觸發而不會導致產生另一個消費者。 –