2012-06-22 58 views
8

有沒有人知道如何編寫一個在PLINQ中返回ParallelQuery的擴展函數?如何爲PLINQ編寫線程感知擴展函數?

更具體地說,我有以下問題:我想在一個PLINQ查詢中執行一個需要引擎的轉換,其創建成本很高,並且不能同時訪問。

我能做到以下幾點:

var result = source.AsParallel().Select ((i) => { var e = new Engine(); return e.Process(i); }) 

這裏創建每個項目一旦發動機,這是太貴了。

我想引擎創建一次每個線程

與骨料,我可以接近我想要的東西,如

// helper class: engine to use plus list of results obtained in thread so far 
class EngineAndResults { 
    public Engine engine = null; 
    public IEnumerable<ResultType> results; 
} 

var result = source.AsParallel().Aggregate (

    // done once per block of items (=thread), 
    // returning an empty list, but a new engine 
    () => new EngineAndList() { 
     engine = new Engine(), 
     results = Enumerable.Empty<ResultType>() 
    }, 

    // we process a new item and put it to the thread-local list, 
    // preserving the engine for further use 
    (engineAndResults, item) => new EngineAndResults() { 
     engine = engineAndResults.engine, 
     results = Enumerable.Concat (
      engineAndResults.results, 
      new ResultType [] { engineAndResults.engine.Process (item) } 
     ) 
    }, 

    // tell linq how to aggregate across threads 
    (engineAndResults1, engineAndResults2) => new EngineAndResults() { 
     engine = engineAndResults1.engine, 
     results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results) 
    }, 

    // after all aggregations, how do we come to the result? 
    engineAndResults => engineAndResults.results 
); 

正如你所看到的,我誤用蓄電池進行每個線程的引擎。這裏的問題是PLINQ最終將結果聚合成單個IEnumerable,這會導致線程同步。如果我想在後面添加另一個PLINQ擴展,這不是很好。

我希望像

var result = source.AsParallel() 
        .SelectWithThreadwiseInitWhichIAmLookingFor (
         () => new Engine(), 
         (engine, item) => engine.Process (item) 
      ) 

沒有任何人有任何想法如何實現這一目標?

回答

5

您可以使用ThreadLocal<T>來做到這一點。例如:

var engine = new ThreadLocal<Engine>(() => new Engine()); 
var result = source.AsParallel() 
        .Select(item => engine.Value.Process(item)); 
+0

謝謝。這是一個很好的解決方案。我做了一個簡短的測試,似乎工作得很好。我試圖找到一種方法將初始化放入擴展函數,但沒有成功 - 顯然必須在調用AsParallel之前創建ThreadLocal。我沒有看到原因,但無論如何,這不是一個大問題。 – JohnB

+0

我認爲這是行不通的,因爲你爲每次迭代創建一個新的'ThreadLocal',所以不能在同一個線程上執行迭代。所有在同一個線程上運行的迭代都需要'ThreadLocal'的同一個實例。 – svick