2017-02-25 75 views
1

我試圖以異步方式處理5000個文件,而不增加線程池不受限制。然而,Parallel.For循環並沒有給我一個一致的正確答案(計數不足),而Task.Run是。Parallel.For vs ThreadPool和異步/等待

我在Parallel.For循環中做錯了什麼,導致這些不正確的答案?

using System; 
using System.Collections.Generic; 
using System.IO; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

class Program 
{ 
    static volatile int count = 0; 
    static volatile int total = 0; 
    static void Main(string[] args) 
    { 
     Parallel.For(0, 5000, new ParallelOptions { MaxDegreeOfParallelism = 10 }, 
      async (index) => 
      { 
       string filePath = $"C:\\temp\\files\\out{index}.txt"; 
       var bytes = await ReadFileAsync(filePath); 
       Interlocked.Add(ref total, bytes.Length); 
       Interlocked.Increment(ref count); 
      }); 
     Console.WriteLine(count); 
     Console.WriteLine(total); 

     count = 0; 
     total = 0; 
     List<Task> tasks = new List<Task>(); 
     foreach (int index in Enumerable.Range(0, 5000)) 
     { 
      tasks.Add(Task.Run(async() => 
      { 
       string filePath = $"C:\\temp\\files\\out{index}.txt"; 
       var bytes = await ReadFileAsync(filePath); 
       Interlocked.Add(ref total, bytes.Length); 
       Interlocked.Increment(ref count); 
      })); 
     } 
     Task.WhenAll(tasks).Wait(); 
     Console.WriteLine(count); 
     Console.WriteLine(total); 
    } 
    public static async Task<byte[]> ReadFileAsync(string filePath) 
    { 
     byte[] bytes = new byte[4096]; 
     using (var sourceStream = new FileStream(filePath, 
       FileMode.Open, FileAccess.Read, FileShare.Read, 
       bufferSize: 4096, useAsync: true)) 
     { 
      await sourceStream.ReadAsync(bytes, 0, 4096); 
     }; 
     return bytes; 
    } 
} 
+0

你能具體談談你所期望的結果。你是什​​麼意思_生成線程池unrestricted_在你的第二個示例中排隊5000任務,但這並不意味着他們已經準備好運行。在任何情況下,只需要沒有聯鎖的揮發性。另外,在這兩種情況下,你希望'count'的值是多少? – JSteward

+0

在我看來,閱讀整個文件只是爲了獲得它的長度是非常低效的。 '新的FileInfo(somePath).Length'會更有效率(儘管不是異步的......但是,新的FileStream的設置也做了一堆同步的東西,包括獲得長度,所以它沒有損失) – spender

+1

Don' t混合'Parallel.For/ForEach'與'async/await' – MickyD

回答

9

Parallel.For不是async意識到。

因此,Parallel.For未按照您的預期執行。由於異步lambda生成的任務沒有被等待,所有的迭代都會在完成創建任務時完成,而不是完成它們。

在您的Parallel.For之後,迭代次數仍然會有一個尚未完成的待定任務,因此您添加到counttotal尚未完成。

Stephen Toub實現了Parallel.ForEach的異步版本。 (ForEachAsync)的實現如下:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll(
     from partition in Partitioner.Create(source).GetPartitions(dop) 
     select Task.Run(async delegate { 
      using (partition) 
       while (partition.MoveNext()) 
        await body(partition.Current); 
     })); 
} 

所以,你可能會改寫你的循環:

Enumerable.Range(0, 5000).ForEachAsync(10, async (index)=>{ 
    //$$$ 
}); 
+0

有趣的是,我不知道你不能把Parallel.For和async結合起來。 '因爲異步lambda生成的任務沒有被等待,所有的迭代都會在創建任務所需的時間內完成,而不是完成它們。「Task.WhenAll不會等待所有任務完成? – MvdD

+0

@MvdD雖然它只等待第一個「異步」方法「退出」,但它確實如此。 「返回」到「異步」可能不會發生。 – VMAtm