參照Parallel.ForEach
靜態擴展方法的following overload:
public static ParallelLoopResult ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody,
Action<TLocal> localFinally
)
在您的具體例子
線:
() => 0, // method to initialize the local variable
是一個簡單的拉姆達(匿名功能)將返回恆定的整數零。此拉姆達被傳遞作爲localInit
參數Parallel.ForEach
- 因爲拉姆達返回一個整數,它具有鍵入Func<int>
並鍵入TLocal
可以由編譯器被推斷爲int
(類似地,TSource
可以從作爲參數source
越過集合的類型推斷)
然後將返回值(0)作爲第三個參數(名爲subtotal
)傳遞給taskBody
Func
。這個(0)用於體循環中的初始種子:
(j, loop, subtotal) =>
{
subtotal += nums[j]; //modify local variable (Bad idea, see comment)
return subtotal; // value to be passed to next iteration
}
此第二拉姆達(傳遞給taskBody
)被稱爲N次,其中N是由TPL分區分配給該任務的項目數。
到第二taskBody
拉姆達每個後續呼叫將通過的subTotal
新的值,有效地計算一運行局部總,對於該任務。在添加了分配給該任務的所有項目之後,將調用第三個也是最後一個localFinally
函數參數,並再次傳遞從taskBody
返回的subtotal
的最終值。由於多個此類任務將並行運行,因此還需要最後一步將所有部分總計加入最終「總計」中。但是,由於多個併發任務(在不同的線程上)可能會爭用變量,因此以線程安全的方式對其進行更改非常重要。
(我已經改變了MSDN變量的名稱,以使其更清晰)
long grandTotal = 0;
Parallel.ForEach(nums, // source collection
() => 0, // method to initialize the local variable
(j, loop, subtotal) => // method invoked by the loop on each iteration
subtotal + nums[j], // value to be passed to next iteration subtotal
// The final value of subtotal is passed to the localFinally function parameter
(subtotal) => Interlocked.Add(ref grandTotal, subtotal)
在MS例,任務體內參數大部的修改是一個貧窮的做法,和不必要的。即代碼subtotal += nums[j]; return subtotal;
會像剛剛return subtotal + nums[j];
可能省略爲拉姆達速記投影更好(j, loop, subtotal) => subtotal + nums[j]
一般來說
的Parallel.For/Parallel.ForEach的localInit/body/localFinally
重載允許一次,每個任務初始化和清理代碼(分別)在任務執行迭代之前和之後運行。
(注意到對於範圍/可枚舉傳遞給平行For
/Foreach
將被劃分成的IEnumerable<>
批次,其中的每一個將被分配一個任務)
在每個任務,localInit
將被調用一次,body
代碼將被重複調用,每批次一次(0..N
次),localFinally
將在調用完成後調用一次。
此外,還可以通過從localInit Func
通用TLocal
返回值傳遞的任務(即到taskBody
和localFinally
代表)期間所需的任何狀態 - 我下面這個變量taskLocals
調用。「localInit」的
常見用途:
- 創建和初始化通過循環體需要昂貴的資源,如數據庫連接或Web服務連接。
- 保持任務局部變量來保存(無競爭)運行總計或集合
- 如果您需要從
localInit
返回多個對象的taskBody
和localFinally
,你可以使用強類型類的,一個Tuple<,,>
或者,如果您只使用lambdas作爲localInit/taskBody/localFinally
,您還可以通過匿名類傳遞數據。請注意,如果您使用從localInit
返回的分享多個任務中的引用類型,則需要考慮此對象上的線程安全性 - 不變性是更可取的。
常見的 「localFinally」 行動的用途:
- 爲了釋放資源,如在
taskLocals
使用IDisposables
(如數據庫連接,文件處理,Web服務客戶端等)
- 將每個任務完成的工作彙總/合併/減少回共享變量。這些共享變量會爭辯,所以線程安全性是一個問題:
的taskBody
是循環操作的一部分tight
- 你要優化這個性能。
這一切最好的一個註釋過的例子總結:
public void MyParallelizedMethod()
{
// Shared variable. Not thread safe
var itemCount = 0;
Parallel.For(myEnumerable,
// localInit - called once per Task.
() =>
{
// Local `task` variables have no contention
// since each Task can never run by multiple threads concurrently
var sqlConnection = new SqlConnection("connstring...");
sqlConnection.Open();
// This is the `task local` state we wish to carry for the duration of the task
return new
{
Conn = sqlConnection,
RunningTotal = 0
}
},
// Task Body. Invoked once per item in the batch assigned to this task
(item, loopState, taskLocals) =>
{
// ... Do some fancy Sql work here on our task's independent connection
using(var command = taskLocals.Conn.CreateCommand())
using(var reader = command.ExecuteReader(...))
{
if (reader.Read())
{
// No contention for `taskLocal`
taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
}
}
// The same type of our `taskLocal` param must be returned from the body
return taskLocals;
},
// LocalFinally called once per Task after body completes
// Also takes the taskLocal
(taskLocals) =>
{
// Any cleanup work on our Task Locals (as you would do in a `finally` scope)
if (taskLocals.Conn != null)
taskLocals.Conn.Dispose();
// Do any reduce/aggregate/synchronisation work.
// NB : There is contention here!
Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
}
還有更多的例子:
Example of per-Task uncontended dictionaries
Example of per-Task database connections
()=>不初始化任何東西,該函數的返回值將被用來初始化局部變量(小計,在你的例子)。 – 2013-02-12 11:22:30