8
有沒有人知道如何編寫一個在PLINQ中返回ParallelQuery的擴展函數?如何爲PLINQ編寫線程感知擴展函數?
更具體地說,我有以下問題:我想在一個PLINQ查詢中執行一個需要引擎的轉換,其創建成本很高,並且不能同時訪問。
我能做到以下幾點:
var result = source.AsParallel().Select ((i) => { var e = new Engine(); return e.Process(i); })
這裏創建每個項目一旦發動機,這是太貴了。
我想引擎創建一次每個線程。
與骨料,我可以接近我想要的東西,如
// helper class: engine to use plus list of results obtained in thread so far
class EngineAndResults {
public Engine engine = null;
public IEnumerable<ResultType> results;
}
var result = source.AsParallel().Aggregate (
// done once per block of items (=thread),
// returning an empty list, but a new engine
() => new EngineAndList() {
engine = new Engine(),
results = Enumerable.Empty<ResultType>()
},
// we process a new item and put it to the thread-local list,
// preserving the engine for further use
(engineAndResults, item) => new EngineAndResults() {
engine = engineAndResults.engine,
results = Enumerable.Concat (
engineAndResults.results,
new ResultType [] { engineAndResults.engine.Process (item) }
)
},
// tell linq how to aggregate across threads
(engineAndResults1, engineAndResults2) => new EngineAndResults() {
engine = engineAndResults1.engine,
results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results)
},
// after all aggregations, how do we come to the result?
engineAndResults => engineAndResults.results
);
正如你所看到的,我誤用蓄電池進行每個線程的引擎。這裏的問題是PLINQ最終將結果聚合成單個IEnumerable,這會導致線程同步。如果我想在後面添加另一個PLINQ擴展,這不是很好。
我希望像
var result = source.AsParallel()
.SelectWithThreadwiseInitWhichIAmLookingFor (
() => new Engine(),
(engine, item) => engine.Process (item)
)
沒有任何人有任何想法如何實現這一目標?
謝謝。這是一個很好的解決方案。我做了一個簡短的測試,似乎工作得很好。我試圖找到一種方法將初始化放入擴展函數,但沒有成功 - 顯然必須在調用AsParallel之前創建ThreadLocal。我沒有看到原因,但無論如何,這不是一個大問題。 – JohnB
我認爲這是行不通的,因爲你爲每次迭代創建一個新的'ThreadLocal',所以不能在同一個線程上執行迭代。所有在同一個線程上運行的迭代都需要'ThreadLocal'的同一個實例。 – svick