2013-07-24 35 views
6

集成I採用了基於代碼我實現的並行/消費者的this questionParallel.ForEach陷入僵局時BlockingCollection

class ParallelConsumer<T> : IDisposable 
{ 
    private readonly int _maxParallel; 
    private readonly Action<T> _action; 
    private readonly TaskFactory _factory = new TaskFactory(); 
    private CancellationTokenSource _tokenSource; 
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>(); 
    private Task _task; 

    public ParallelConsumer(int maxParallel, Action<T> action) 
    { 
     _maxParallel = maxParallel; 
     _action = action; 
    } 

    public void Start() 
    { 
     try 
     { 
      _tokenSource = new CancellationTokenSource(); 
      _task = _factory.StartNew(
       () => 
       { 
        Parallel.ForEach(
         _entries.GetConsumingEnumerable(), 
         new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token }, 
         (item, loopState) => 
         { 
          Log("Taking" + item); 
          if (!_tokenSource.IsCancellationRequested) 
          { 
           _action(item); 
           Log("Finished" + item); 
          } 
          else 
          { 
           Log("Not Taking" + item); 
           _entries.CompleteAdding(); 
           loopState.Stop(); 
          } 
         }); 
       }, 
       _tokenSource.Token); 
     } 
     catch (OperationCanceledException oce) 
     { 
      System.Diagnostics.Debug.WriteLine(oce); 
     } 
    } 

    private void Log(string message) 
    { 
     Console.WriteLine(message); 
    } 

    public void Stop() 
    { 
     Dispose(); 
    } 

    public void Enqueue(T entry) 
    { 
     Log("Enqueuing" + entry); 
     _entries.Add(entry); 
    } 

    public void Dispose() 
    { 
     if (_task == null) 
     { 
      return; 
     } 

     _tokenSource.Cancel(); 
     while (!_task.IsCanceled) 
     { 
     } 

     _task.Dispose(); 
     _tokenSource.Dispose(); 
     _task = null; 
    } 
} 

,這裏是一個測試代碼

class Program 
{ 
    static void Main(string[] args) 
    { 
     TestRepeatedEnqueue(100, 1); 
    } 

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount) 
    { 
     bool[] flags = new bool[itemCount]; 
     var consumer = new ParallelConsumer<int>(parallelCount, 
               (i) => 
               { 
                flags[i] = true; 
               } 
      ); 
     consumer.Start(); 
     for (int i = 0; i < itemCount; i++) 
     { 
      consumer.Enqueue(i); 
     } 
     Thread.Sleep(1000); 
     Debug.Assert(flags.All(b => b == true)); 



    } 
} 

的測試總是失敗 - 它始終停留在100次測試中的93個項目左右。任何想法我的代碼的哪一部分導致了這個問題,以及如何解決它?

回答

8

您不能使用Parallel.Foreach()BlockingCollection.GetConsumingEnumerable(),因爲您已經發現。

有關說明,請參閱本博客文章:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

該博客還提供了一個名爲GetConsumingPartitioner(),你可以用它來解決問題方法的源代碼。

BlockingCollection的GetConsumingEnumerable實現使用BlockingCollection的內部同步其已經支持多個用戶併發,但的ForEach不知道,它的枚舉的分割邏輯也需要採取鎖:從博客

摘錄同時訪問枚舉。

因此,這裏的同步比實際需要更多,從而導致潛在的不可忽略的性能影響。

[也]默認情況下,由Parallel.ForEach和PLINQ使用的分區算法使用分塊以最小化同步成本:而不是每個元素都取一次鎖,它會佔用鎖,獲取一組元素(一個塊),然後釋放鎖。

雖然此設計可以幫助提高整體吞吐量,但對於更多關注低延遲的情況,該組塊可能會令人望而卻步。

+0

謝謝。這解決了我的問題。無論如何,當我進一步測試時,當項目數是該序列的成員時,我的OP中的代碼不會失敗,例如[A200672](http://oeis.org/A200672)。 1,2,3,5,7,9,13,17,21,29,37,45,61,77,93,......任何想法爲什麼?只是好奇。 – user69715

+0

@ user69715這是我在嘗試做類似事情時發現的那種奇怪的行爲。我想這只是Parallel.ForEach()和底層BlockingCollection之間的一些奇怪的交互,但我無法真正解釋它。 –

2

失敗的原因是由於以下原因所解釋here

默認受僱於Parallel.ForEach和PLINQ使用,以便分塊既 以最小化 同步成本的分區算法:相當比每個元素取一次鎖, 它會佔用鎖,抓住一組元素(一塊),然後釋放鎖。

爲了得到它的工作,你可以在你的ParallelConsumer<T>類添加一個方法,表明添加完畢,如下

public void StopAdding() 
    { 
     _entries.CompleteAdding(); 
    } 

現在調用這個方法您for loop後,如下

 consumer.Start(); 
     for (int i = 0; i < itemCount; i++) 
     { 
      consumer.Enqueue(i); 
     } 
     consumer.StopAdding(); 

否則,Parallel.ForEach()將等待達到閾值以抓取塊並開始處理。

+0

事情是在生產中,任務連續排隊,所以標記「StopAdding」沒有幫助。感謝您的回答,+1,但我會與其他答案一起去。 – user69715

+0

糟糕,看起來我還不能+1 – user69715