2014-07-23 94 views
4

我下面的代碼:Parallel.ForEach丟失物品

HttpContext httpContext = HttpContext.Current; 
RequestContext currentContext = RequestContextManager.CurrentContext; 
ILifetimeScope currentSessionScope = PlatformContext.LifeTimeScope; 

ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>(); 
ConcurrentBag<ParallelCalculateObj> forEachResult = new ConcurrentBag<ParallelCalculateObj>(); 
ConcurrentBag<ParallelCalculateObj> testForEachPassResult = new ConcurrentBag<ParallelCalculateObj>(); 

ParallelLoopResult loopResult = Parallel.ForEach(applications,() => 
{ 
    HttpContext.Current = httpContext; 
    RequestContextManager.SetCustomCurrentContext(currentContext); 
    PlatformContext.LifeTimeScope = currentSessionScope; 
    return new ParallelCalculateObj(); 
}, (application, pls, localObj) => 
{ 
    try 
    { 
     // some code 
    } 
    catch (Exception e) 
    { 
     exceptions.Enqueue(e); 
    } 
    testForEachPassResult.Add(localObj); 
    return localObj; 
}, forEachResult.Add); 

其中applications.Count = 3。執行上述代碼後,我得到forEachResult.Count = 2testForEachPassResult.Count = 3

爲什麼forEachResult集合不包含所有元素? 沒有例外,並且ParallelLoopResult.IsCompleted = true

一兩件事,可能會解決我的問題是有幫助的是,這三個項目是在兩個線程運行:

  1. Item01 - > Thread.CurrentThread.ManagedThreadId是14
  2. Item02 - > Thread.CurrentThread .ManagedThreadId是10
  3. Item03 - > Thread.CurrentThread.ManagedThreadId是14

回答

6

我想你是用錯誤的方式使用Parallel.ForEach

您正在使用具有本地狀態的過載。這個本地狀態對分區/線程來說是唯一的,但並不是每個迭代都有唯一的本地狀態。

想象將輸入列表劃分爲分區。然後有N地方國家。作爲最後一步,您已將這些地方州的N合併爲您的最終價值。通常,N將小於列表中的項目數量,除非您使用更具體的過載之一,否則TPL將確定列表分配的方式。

因爲你顯然想用每次迭代的結果填充一些列表,你的本地狀態應該也是一個包含每個特定分區迭代結果的列表。對於最後一個動作你把所有的列表到一個單一的名單:

Parallel.ForEach(
    applications, 
    () => new List<ParallelCalculateObj>(), 
    (application, pls, localObj) => 
    { 
     // do something 
     var obj = new ParallelCalculateObj { /* data of the iteration */ }; 
     localObj.Add(obj); 
     return localObj; 
    }, 
    localObj => 
    { 
     foreach (var result in localObj) 
     { 
      forEachResult.Add(result); 
     } 
    }); 

請注意,如果你那樣做那麼值的順序forEachResult將不符合項的順序applications。如果你想這樣做,那麼你必須使用ParallelLoopState類的索引。

+0

非常感謝!但是,你能向我解釋什麼是使用localFinally部分的目的?我不能只是將項目添加到循環體中的集合? –

+0

@MateuszPuwałowskilocalFinally將每個分區的結果合併爲一個最終值。以您的問題爲例:您有兩個分區,第一個分區包含Item01和Item03以及第二個Item02。這兩個本地列表包含處理Item01和Item03的結果以及Item02的結果。 localfinally步驟然後將這兩個列表合併成一個包含所有三個結果的列表。 – Dirk

1
嘗試

lock(testForEachPassResult){ 
    testForEachPassResult.Add(localObj); 
    } 

最有可能在添加元素時,您沒有列表的最新狀態。請記住,列表中可能會有另一個元素從另一個線程同時添加。所以,當它改變時,你將新項目添加到舊版本的testForEachPassResult。

如果您鎖定列表,所有其他線程將等待列表解鎖。

+3

這就是我使用System.Collections.Concurrent命名空間的原因。它是線程安全的。 –

+0

謝謝Mez和Mateusz。這節省了一天(今天,今天是星期五!)).... – granadaCoder