2012-07-19 35 views
97

在metro應用程序中,我需要執行一些WCF調用。有大量的呼叫要做,所以我需要做一個並行循環。問題是並行循環在WCF調用全部完成之前退出。嵌套等待在Parallel.ForEach

你會如何重構這個按預期工作?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; 
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>(); 

Parallel.ForEach(ids, async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    var cust = await repo.GetCustomer(i); 
    customers.Add(cust); 
}); 

foreach (var customer in customers) 
{ 
    Console.WriteLine(customer.ID); 
} 

Console.ReadKey(); 

回答

101

Parallel.ForEach()背後的全部想法是,你有一組線程,每個線程處理集合的一部分。正如您注意到的,這不適用於async - await,您希望在異步調用期間釋放該線程。

您可以通過阻止ForEach()線程來「修復」,但是這會破壞整個點async - await

你可以做的是使用TPL Dataflow而不是Parallel.ForEach(),它支持異步Task

具體來說,您的代碼可以使用TransformBlock來編寫,它使用async lambda將每個id轉換爲Customer。該塊可以配置爲並行執行。您可以將該塊鏈接到ActionBlock,該ActionBlock將每個Customer寫入控制檯。 設置完成後,您可以將Post()的每個ID設置爲TransformBlock

在代碼:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; 

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i => 
    { 
     ICustomerRepo repo = new CustomerRepo(); 
     return await repo.GetCustomer(i); 
    }, new ExecutionDataflowBlockOptions 
    { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
    }); 
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID)); 
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions 
    { 
     PropagateCompletion = true 
    }); 

foreach (var id in ids) 
    getCustomerBlock.Post(id); 

getCustomerBlock.Complete(); 
writeCustomerBlock.Completion.Wait(); 

雖然你可能想在TransformBlock的並行限制一些小的常量。另外,您可以限制TransformBlock的容量,並使用SendAsync()異步添加項目,例如,如果該集合太大。

與您的代碼相比(如果有效的話),作爲一個額外的好處是隻要單個項目完成就可以開始寫入,而不是等到所有處理完成。

+1

一個非常簡要概述,反應性擴展,TPL和TPL數據流 - http://vantsuyoshi.wordpress.com/2012/01/05/when-to-use-tpl-async-reactive-extension-tpl-dataflow /對於像我這樣可能需要一些清晰度的人。 – 2013-09-13 11:04:54

+1

我很確定這個答案不會並行處理。我相信你需要在id上做一個Parallel.ForEach並將它們發佈到getCustomerBlock。至少這是我在測試這個建議時發現的。 – JasonLind 2015-12-16 22:23:26

+2

@JasonLind它確實如此。並行使用'Parallel.ForEach()''Post()'項目不應該有任何實際效果。 – svick 2015-12-16 22:26:02

79

svick's answer是(像往常一樣)優秀。

但是,如果實際上有大量數據要傳輸,我發現Dataflow更有用。或者當您需要一個兼容async的隊列時。

在你的情況,一個簡單的解決方法就是使用async風格的並行:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; 

var customerTasks = ids.Select(i => 
    { 
    ICustomerRepo repo = new CustomerRepo(); 
    return repo.GetCustomer(i); 
    }); 
var customers = await Task.WhenAll(customerTasks); 

foreach (var customer in customers) 
{ 
    Console.WriteLine(customer.ID); 
} 

Console.ReadKey(); 
+7

如果你想手動限制並行(在這種情況下你很可能會這樣做),這樣做會更復雜。 – svick 2012-07-19 16:50:56

+1

好點。數據流具有很好的旋鈕。 – 2012-07-19 16:51:37

+0

但是你說得對,Dataflow可能非常複雜(例如,與'Parallel.ForEach()'相比)。但我認爲它是目前最好的選擇,可以對集合進行幾乎所有的「異步」工作。 – svick 2012-07-19 16:53:10

47

使用數據流爲svick建議可能是矯枉過正,和斯蒂芬的回答並沒有提供手段來控制併發操作。然而,可以相當簡單地實現:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
    int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory) 
{ 
    var activeTasks = new List<Task>(maxDegreeOfConcurrency); 
    foreach (var task in collection.Select(taskFactory)) 
    { 
     activeTasks.Add(task); 
     if (activeTasks.Count == maxDegreeOfConcurrency) 
     { 
      await Task.WhenAny(activeTasks.ToArray()); 
      //observe exceptions here 
      activeTasks.RemoveAll(t => t.IsCompleted); 
     } 
    } 
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    { 
     //observe exceptions in a manner consistent with the above 
    }); 
} 

ToArray()調用可以通過優化使用數組而不是列表和更換完成的任務,但我懷疑這會掙很多在大多數情況下的差別。每OP的問題示例用法:

RunWithMaxDegreeOfConcurrency(10, ids, async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    var cust = await repo.GetCustomer(i); 
    customers.Add(cust); 
}); 

編輯研究員SO用戶和TPL奇才Eli Arbel向我指出一個related article from Stephen Toub。像往常一樣,他的實現既優雅又高效:

public static Task ForEachAsync<T>(
     this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
     from partition in Partitioner.Create(source).GetPartitions(dop) 
     select Task.Run(async delegate { 
      using (partition) 
       while (partition.MoveNext()) 
        await body(partition.Current).ContinueWith(t => 
          { 
           //observe exceptions 
          }); 

     })); 
} 
+0

愛Eli阿貝爾選項。有幾個後續問題:我很想跟蹤進展情況。我在方法中添加了'ref int done',然後'ContinueWith done ++'但是「不能在匿名方法,lambda表達式或查詢表達式中使用ref或out參數......任何想法如何跟蹤進度? – Stefanvds 2015-07-30 07:29:48

+0

沒關係,我可以在foreachasync代碼中堅持完成++ – Stefanvds 2015-07-30 07:43:38

+0

我發現ForEachAsync代碼不能按預期工作,至少,並非總是出於某種原因(我目前無法解釋發生了什麼)。使用'dop = 5'在調用代碼時我會得到不同的結果(我應該始終得到相同的結果 - 數據不會改變)!小心! – Shaamaan 2015-11-13 15:04:34

7

包裹Parallel.Foreach成Task.Run()和而不是等待關鍵字使用[yourasyncmethod]。結果

(你需要做Task.Run事情不會阻止UI線程)

事情是這樣的:

var yourForeachTask = Task.Run(() => 
     { 
      Parallel.ForEach(ids, i => 
      { 
       ICustomerRepo repo = new CustomerRepo(); 
       var cust = repo.GetCustomer(i).Result; 
       customers.Add(cust); 
      }); 
     }); 
await yourForeachTask; 
+3

這是什麼問題?我已經完成了它,讓'Parallel.ForEach'完成並行工作,直到所有的都完成了,然後把整個事情推到一個後臺線程來獲得一個響應式的用戶界面,這有什麼問題嗎?也許這是一個睡眠線程太多,但它是短的,可讀的代碼 – ygoe 2015-06-17 18:22:41

+0

@LonelyPixel我唯一的問題是它調用'Task.Run' w母雞'TaskCompletionSource'是可取的。 – Gusdor 2016-03-30 13:31:43

+1

@Gusdor好奇 - 爲什麼'TaskCompletionSource'更可取? – Seafish 2016-07-13 14:34:08

6

這應該是比讓整個TPL數據流的工作相當有效,也更容易荷蘭國際集團:

var customers = await ids.SelectAsync(async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    return await repo.GetCustomer(i); 
}); 

... 

public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4) 
{ 
    var results = new List<TResult>(); 

    var activeTasks = new HashSet<Task<TResult>>(); 
    foreach (var item in source) 
    { 
     activeTasks.Add(selector(item)); 
     if (activeTasks.Count >= maxDegreesOfParallelism) 
     { 
      var completed = await Task.WhenAny(activeTasks); 
      activeTasks.Remove(completed); 
      results.Add(completed.Result); 
     } 
    } 

    results.AddRange(await Task.WhenAll(activeTasks)); 
    return results; 
} 
+0

不應該使用'await'如下:'var customers = await ids.SelectAsync(async i => {...});'? – Paccc 2014-12-14 04:02:25

+0

@pacc:你說的沒錯。固定。 – 2014-12-14 04:20:34

1

我有點晚了黨,但你可能要考慮使用GetAwaiter.GetResult()運行在同步方面的異步代碼,但下面平行中;

Parallel.ForEach(ids, i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    // Run this in thread which Parallel library occupied. 
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult(); 
    customers.Add(cust); 
}); 
7

您可以使用新的AsyncEnumerator NuGet Package節省工作量,這在4年前當問題最初發布時並不存在。它可以讓你控制並行度:

using System.Collections.Async; 
... 

await ids.ParallelForEachAsync(async i => 
{ 
    ICustomerRepo repo = new CustomerRepo(); 
    var cust = await repo.GetCustomer(i); 
    customers.Add(cust); 
}, 
maxDegreeOfParallelism: 10); 

免責聲明:我是AsyncEnumerator庫,它是開源的,MIT許可下的作者,和我張貼此消息只是爲了幫助社區。

+4

謝爾蓋,你應該透露你是圖書館 – 2017-12-16 22:55:04

+2

好的作者,補充了免責聲明。我不想從廣告中獲得任何好處,只想幫助別人;) – 2018-02-24 18:01:23

+0

非常有用的圖書館,希望很快就會被納入Core – rekiem87 2018-02-27 23:09:20

1

引入了一堆的輔助方法後,您將能夠運行並行查詢這個簡單的sintax:

const int DegreeOfParallelism = 10; 
IEnumerable<double> result = await Enumerable.Range(0, 1000000) 
    .Split(DegreeOfParallelism) 
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false)) 
    .ConfigureAwait(false); 

這裏發生的是我們分裂源收集到10塊(.Split(DegreeOfParallelism)),然後運行10個任務每個處理它的項目一個接一個(.SelectManyAsync(...)),並將它們合併到一個列表中。

值得一提的是有個簡單的方法:

double[] result2 = await Enumerable.Range(0, 1000000) 
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false)) 
    .WhenAll() 
    .ConfigureAwait(false); 

但它需要一個預防:如果你有一個源的集合,是太大了,它會chedule爲每個項目一個Task向右走,這可能會導致顯着的性能點擊。在上述外表實施例中使用

擴展方法如下:異步的

public static class CollectionExtensions 
{ 
    /// <summary> 
    /// Splits collection into number of collections of nearly equal size. 
    /// </summary> 
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount) 
    { 
     if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount)); 

     List<T> source = src.ToList(); 
     var sourceIndex = 0; 
     for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++) 
     { 
      var list = new List<T>(); 
      int itemsLeft = source.Count - targetIndex; 
      while (slicesCount * list.Count < itemsLeft) 
      { 
       list.Add(source[sourceIndex++]); 
      } 

      yield return list; 
     } 
    } 

    /// <summary> 
    /// Takes collection of collections, projects those in parallel and merges results. 
    /// </summary> 
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
     this IEnumerable<IEnumerable<T>> source, 
     Func<T, Task<TResult>> func) 
    { 
     List<TResult>[] slices = await source 
      .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false)) 
      .WhenAll() 
      .ConfigureAwait(false); 
     return slices.SelectMany(s => s); 
    } 

    /// <summary>Runs selector and awaits results.</summary> 
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector) 
    { 
     List<TResult> result = new List<TResult>(); 
     foreach (TSource source1 in source) 
     { 
      TResult result1 = await selector(source1).ConfigureAwait(false); 
      result.Add(result1); 
     } 
     return result; 
    } 

    /// <summary>Wraps tasks with Task.WhenAll.</summary> 
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source) 
    { 
     return Task.WhenAll<TResult>(source); 
    } 
}