2013-02-12 26 views
9

我不確定使用在Parallel.ForEach當地的初始化函數,因爲在MSDN文章中描述:http://msdn.microsoft.com/en-us/library/dd997393.aspxParallel ForEach的本地初始化如何工作?

Parallel.ForEach<int, long>(nums, // source collection 
    () => 0, // method to initialize the local variable 
    (j, loop, subtotal) => // method invoked by the loop on each iteration 
    { 
     subtotal += nums[j]; //modify local variable 
     return subtotal; // value to be passed to next iteration 
    },... 

怎樣()=> 0初始化什麼?變量的名稱是什麼,以及如何在循環邏輯中使用它?

+0

()=>不初始化任何東西,該函數的返回值將被用來初始化局部變量(小計,在你的例子)。 – 2013-02-12 11:22:30

回答

19

參照Parallel.ForEach靜態擴展方法的following overload

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source, 
    Func<TLocal> localInit, 
    Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody, 
    Action<TLocal> localFinally 
) 

在您的具體例子

線:

() => 0, // method to initialize the local variable 

是一個簡單的拉姆達(匿名功能)將返回恆定的整數零。此拉姆達被傳遞作爲localInit參數Parallel.ForEach - 因爲拉姆達返回一個整數,它具有鍵入Func<int>並鍵入TLocal可以由編譯器被推斷爲int(類似地,TSource可以從作爲參數source越過集合的類型推斷)

然後將返回值(0)作爲第三個參數(名爲subtotal)傳遞給taskBodyFunc。這個(0)用於體循環中的初始種子:

(j, loop, subtotal) => 
{ 
    subtotal += nums[j]; //modify local variable (Bad idea, see comment) 
    return subtotal;  // value to be passed to next iteration 
} 

此第二拉姆達(傳遞給taskBody)被稱爲N次,其中N是由TPL分區分配給該任務的項目數。

到第二taskBody拉姆達每個後續呼叫將通過的subTotal新的值,有效地計算一運行局部總,對於該任務。在添加了分配給該任務的所有項目之後,將調用第三個也是最後一個localFinally函數參數,並再次傳遞從taskBody返回的subtotal的最終值。由於多個此類任務將並行運行,因此還需要最後一步將所有部分總計加入最終「總計」中。但是,由於多個併發任務(在不同的線程上)可能會爭用變量,因此以線程安全的方式對其進行更改非常重要。

(我已經改變了MSDN變量的名稱,以使其更清晰)

long grandTotal = 0; 
Parallel.ForEach(nums,   // source collection 
() => 0,      // method to initialize the local variable 
    (j, loop, subtotal) =>   // method invoked by the loop on each iteration 
    subtotal + nums[j],   // value to be passed to next iteration subtotal 
    // The final value of subtotal is passed to the localFinally function parameter 
    (subtotal) => Interlocked.Add(ref grandTotal, subtotal) 

在MS例,任務體內參數大部的修改是一個貧窮的做法,和不必要的。即代碼subtotal += nums[j]; return subtotal;會像剛剛return subtotal + nums[j];可能省略爲拉姆達速記投影更好(j, loop, subtotal) => subtotal + nums[j]

一般來說

Parallel.For/Parallel.ForEachlocalInit/body/localFinally重載允許一次,每個任務初始化和清理代碼(分別)在任務執行迭代之前和之後運行。

(注意到對於範圍/可枚舉傳遞給平行For/Foreach將被劃分成的IEnumerable<>批次,其中的每一個將被分配一個任務)

每個任務localInit將被調用一次,body代碼將被重複調用,每批次一次(0..N次),localFinally將在調用完成後調用一次。

此外,還可以通過從localInit Func通用TLocal返回值傳遞的任務(即到taskBodylocalFinally代表)期間所需的任何狀態 - 我下面這個變量taskLocals調用。「localInit」的

常見用途:

  • 創建和初始化通過循環體需要昂貴的資源,如數據庫連接或Web服務連接。
  • 保持任務局部變量來保存(無競爭)運行總計或集合
  • 如果您需要從localInit返回多個對象的taskBodylocalFinally,你可以使用強類型類的,一個Tuple<,,>或者,如果您只使用lambdas作爲localInit/taskBody/localFinally,您還可以通過匿名類傳遞數據。請注意,如果您使用從localInit返回的分享多個任務中的引用類型,則需要考慮此對象上的線程安全性 - 不變性是更可取的。

常見的 「localFinally」 行動的用途:

  • 爲了釋放資源,如在taskLocals使用IDisposables(如數據庫連接,文件處理,Web服務客戶端等)
  • 將每個任務完成的工作彙總/合併/減少回共享變量。這些共享變量會爭辯,所以線程安全性是一個問題:
    • 例如在原始的類型,如整數
    • lock或類似Interlocked.Increment將需要寫操作
    • 利用concurrent collections的,以節省時間和精力。

taskBody是循環操作的一部分tight - 你要優化這個性能。

這一切最好的一個註釋過的例子總結:

public void MyParallelizedMethod() 
{ 
    // Shared variable. Not thread safe 
    var itemCount = 0; 

    Parallel.For(myEnumerable, 
    // localInit - called once per Task. 
    () => 
    { 
     // Local `task` variables have no contention 
     // since each Task can never run by multiple threads concurrently 
     var sqlConnection = new SqlConnection("connstring..."); 
     sqlConnection.Open(); 

     // This is the `task local` state we wish to carry for the duration of the task 
     return new 
     { 
      Conn = sqlConnection, 
      RunningTotal = 0 
     } 
    }, 
    // Task Body. Invoked once per item in the batch assigned to this task 
    (item, loopState, taskLocals) => 
    { 
     // ... Do some fancy Sql work here on our task's independent connection 
     using(var command = taskLocals.Conn.CreateCommand()) 
     using(var reader = command.ExecuteReader(...)) 
     { 
     if (reader.Read()) 
     { 
      // No contention for `taskLocal` 
      taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]); 
     } 
     } 
     // The same type of our `taskLocal` param must be returned from the body 
     return taskLocals; 
    }, 
    // LocalFinally called once per Task after body completes 
    // Also takes the taskLocal 
    (taskLocals) => 
    { 
     // Any cleanup work on our Task Locals (as you would do in a `finally` scope) 
     if (taskLocals.Conn != null) 
     taskLocals.Conn.Dispose(); 

     // Do any reduce/aggregate/synchronisation work. 
     // NB : There is contention here! 
     Interlocked.Add(ref itemCount, taskLocals.RunningTotal); 
    } 

還有更多的例子:

Example of per-Task uncontended dictionaries

Example of per-Task database connections

+0

刪除數據庫示例並用CPU綁定的示例進行替換。對數據庫使用Parallel.For可能不是一個好主意 - 通過Task.WhenAll進行異步/等待和並行是比較好的選擇。 – StuartLC 2017-12-21 10:48:06

2

您可以在MSDN中獲得提示correct Parallel.ForEach過載。

對於參與循環執行的每個線程調用一次localInit委託,並返回每個這些任務的初始本地狀態。這些初始狀態被傳遞給每個任務的第一次正文調用。然後,每個後續的主體調用都會返回一個可能被修改的狀態值,該值將傳遞給下一個主體調用。

在您的例子() => 0只是返回0代表,所以這個值用於第一次迭代上每個任務

+0

我在發佈之前已經閱讀過這篇文章,但是,我的問題依然如此。如何使用只返回0的lambda提供了一個「用於每個任務的第一次迭代」的值?我如何在代碼中使用這個值?它沒有標識符。 – 2013-02-12 12:35:35

+3

它有 - 它是你的例子中的「小計」。但僅限於每個任務的第一次迭代。在該任務的所有其他迭代中的「小計」的值是從先前迭代返回的值。 – 2013-02-12 12:37:31

+0

好吧,我跟着你到目前爲止,但這段代碼的哪一部分表明「()=> 0」=小計?init函數和(j,循環,小計)聲明之間的約定是什麼? 編輯:我只是想通了,魔鬼是在細節,或在這種情況下,在正文函數System.Func 的通用參數的名稱。謝謝,我會標記你的答案。 – 2013-02-12 13:31:12

6

作爲@Honza Brestan的答案的擴展。 Parallel foreach將工作分解爲任務的方式也很重要,它會將幾次循環迭代分組爲單個任務,因此在實踐中每n次循環調用一次localInit(),並且可以同時啓動多個組。

localInitlocalFinally關鍵是要確保平行foreach循環可以組合每個itteration結果到一個結果沒有你需要在body指定鎖定報表,要做到這一點,你必須爲提供一個初始化要創建的值(localInit),那麼每個body迭代可以處理本地值,然後提供一種方法來以線程安全的方式組合每個組的值(localFinally)。

如果您不需要localInit來同步任務,則可以使用lambda方法正常引用周圍環境中的值,而不會出現任何問題。請參閱Threading in C# (Parallel.For and Parallel.ForEach)瞭解關於使用localInit/Finally的深入教程,並向下滾動至使用本地值進行優化,Joseph Albahari確實是我所有線程的goto源碼。

+0

投票,因爲這真的爲我澄清的事情。謝謝。 – 2013-02-13 08:36:50

0

從我身邊一點點簡單的例子

class Program 
{ 
    class Person 
    { 
     public int Id { get; set; } 
     public string Name { get; set; } 
     public int Age { get; set; } 
    } 

    static List<Person> GetPerson() => new List<Person>() 
    { 
     new Person() { Id = 0, Name = "Artur", Age = 26 }, 
     new Person() { Id = 1, Name = "Edward", Age = 30 }, 
     new Person() { Id = 2, Name = "Krzysiek", Age = 67 }, 
     new Person() { Id = 3, Name = "Piotr", Age = 23 }, 
     new Person() { Id = 4, Name = "Adam", Age = 11 }, 
    }; 

    static void Main(string[] args) 
    { 
     List<Person> persons = GetPerson(); 
     int ageTotal = 0; 

     Parallel.ForEach 
     (
      persons, 
      () => 0, 
      (person, loopState, subtotal) => subtotal + person.Age, 
      (subtotal) => Interlocked.Add(ref ageTotal, subtotal) 
     ); 

     Console.WriteLine($"Age total: {ageTotal}"); 
     Console.ReadKey(); 
    } 
}