2014-09-23 57 views
1

在工作中,我們的一個進程使用SQL數據庫表作爲隊列。我一直在設計一個隊列閱讀器來檢查表中的排隊工作,在工作開始時更新行狀態,並在工作完成時刪除行。我使用Parallel.Foreach給每個進程自己的線程和設定MaxDegreeOfParallelism到4Odd behavior with yield and Parallel.ForEach

當隊列讀取器啓動時,它會檢查任何未完成的工作,並加載工作納入一個列表中,那麼它就是一個Concat與名單以及返回在無限循環中運行的IEnumerable以檢查新工作的方法。這個想法是,應該先處理未完成的工作,然後可以在線程可用的情況下工作。然而,我所看到的是FetchQueuedWork會將隊列表中的幾十行更改爲立即「處理」,但一次只能處理幾個項目。

我想要發生的事情是FetchQueuedWork只會在Parallel.Foreach中打開一個槽時纔會得到新的工作並更新表格。對我來說真的很奇怪,它的行爲與我在本地開發人員環境中運行代碼時的預期完全相同,但在生產中我遇到了上述問題。

我使用.NET 4以下是代碼:

public void Go() 
{ 
    List<WorkData> unfinishedWork = WorkData.LoadUnfinishedWork(); 
    IEnumerable<WorkData> work = unfinishedWork.Concat(FetchQueuedWork());  
    Parallel.ForEach(work, new ParallelOptions { MaxDegreeOfParallelism = 4 }, DoWork); 
} 

private IEnumerable<WorkData> FetchQueuedWork() 
{ 
    while (true) 
    { 
     var workUnit = WorkData.GetQueuedWorkAndSetStatusToProcessing(); 
     yield return workUnit; 
    } 
} 

private void DoWork(WorkData workUnit) 
{ 
    if (!workUnit.Loaded) 
    { 
     System.Threading.Thread.Sleep(5000); 
     return; 
    } 
    Work(); 
} 

回答

3

我懷疑是默認行爲是緩衝輸入(發行模式?)。手了 - 當談到.NET 4.5

List<WorkData> unfinishedWork = WorkData.LoadUnfinishedWork(); 
IEnumerable<WorkData> work = unfinishedWork.Concat(FetchQueuedWork());  
var options = new ParallelOptions { MaxDegreeOfParallelism = 4 }; 
var partitioner = Partitioner.Create(work, EnumerablePartitionerOptions.NoBuffering); 
Parallel.ForEach(partioner, options, DoWork); 
+0

這很有趣。不幸的是,我忘了提及我在.Net 4上,而這個功能只有4.5。 – 2014-09-23 02:43:47

3

Blorgbeard的解決方案是正確的:您可能需要創建自己的分區,並通過它的NoBuffering選項。

如果你被限制到.NET 4,你有幾種選擇:

  • 替換您Parallel.ForEachwork.AsParallel().WithDegreeOfParallelism(4).ForAll(DoWork)。 PLINQ在緩衝物品方面更加保守,所以這應該是個訣竅。

  • 寫你自己的枚舉分區器(祝你好運)。

  • 創建一個難看的信號量劈像這樣:

副作用的用於簡潔起見Select

public void Go() 
{ 
    const int MAX_DEGREE_PARALLELISM = 4; 

    using (var semaphore = new SemaphoreSlim(MAX_DEGREE_PARALLELISM, MAX_DEGREE_PARALLELISM)) 
    { 
     List<WorkData> unfinishedWork = WorkData.LoadUnfinishedWork(); 

     IEnumerable<WorkData> work = unfinishedWork 
      .Concat(FetchQueuedWork()) 
      .Select(w => 
      { 
       // Side-effect: bad practice, but easier 
       // than writing your own IEnumerable. 
       semaphore.Wait(); 

       return w; 
      }); 

     // You still need to specify MaxDegreeOfParallelism 
     // here so as not to saturate your thread pool when 
     // Parallel.ForEach's load balancer kicks in. 
     Parallel.ForEach(work, new ParallelOptions { MaxDegreeOfParallelism = MAX_DEGREE_PARALLELISM }, workUnit => 
     { 
      try 
      { 
       this.DoWork(workUnit); 
      } 
      finally 
      { 
       semaphore.Release(); 
      } 
     }); 
    } 
}