在工作中,我們的一個進程使用SQL數據庫表作爲隊列。我一直在設計一個隊列閱讀器來檢查表中的排隊工作,在工作開始時更新行狀態,並在工作完成時刪除行。我使用Parallel.Foreach
給每個進程自己的線程和設定MaxDegreeOfParallelism
到4Odd behavior with yield and Parallel.ForEach
當隊列讀取器啓動時,它會檢查任何未完成的工作,並加載工作納入一個列表中,那麼它就是一個Concat
與名單以及返回在無限循環中運行的IEnumerable
以檢查新工作的方法。這個想法是,應該先處理未完成的工作,然後可以在線程可用的情況下工作。然而,我所看到的是FetchQueuedWork
會將隊列表中的幾十行更改爲立即「處理」,但一次只能處理幾個項目。
我想要發生的事情是FetchQueuedWork
只會在Parallel.Foreach
中打開一個槽時纔會得到新的工作並更新表格。對我來說真的很奇怪,它的行爲與我在本地開發人員環境中運行代碼時的預期完全相同,但在生產中我遇到了上述問題。
我使用.NET 4以下是代碼:
public void Go()
{
List<WorkData> unfinishedWork = WorkData.LoadUnfinishedWork();
IEnumerable<WorkData> work = unfinishedWork.Concat(FetchQueuedWork());
Parallel.ForEach(work, new ParallelOptions { MaxDegreeOfParallelism = 4 }, DoWork);
}
private IEnumerable<WorkData> FetchQueuedWork()
{
while (true)
{
var workUnit = WorkData.GetQueuedWorkAndSetStatusToProcessing();
yield return workUnit;
}
}
private void DoWork(WorkData workUnit)
{
if (!workUnit.Loaded)
{
System.Threading.Thread.Sleep(5000);
return;
}
Work();
}
這很有趣。不幸的是,我忘了提及我在.Net 4上,而這個功能只有4.5。 – 2014-09-23 02:43:47