2017-10-09 27 views
-1

我有一列我想要使用Parallel.ForEach並行執行的任務。它可以很好地開始並行運行4個任務,但最終每次只能減少一個任務。 這裏是並行任務的時間計數:Parallel.ForEach最後在當時執行一個任務

1 2 3 4 4 3 4 4 ... 4 4 4 3 3 1 1 1 1 1 1 1

並行

最大程度被設置爲4 。執行結束時,一次只執行一個任務,所有執行都在同一個線程上運行。我的問題是爲什麼我最終一次執行這個任務?我怎樣才能避免這種情況?

下面是代碼:

var threadCount = 4; 
ThreadPool.SetMinThreads(threadCount, threadCount); 
Parallel.ForEach(taskDataList, 
    new ParallelOptions() {MaxDegreeOfParallelism = threadCount}, 
    (x) => { RunOne(x); }); 

RunOne函數啓動外部處理並等待其結束。有人懷疑RunOne可能是缺乏並行執行的問題。爲了確保不是這種情況,我通過用相同持續時間的睡眠呼叫替換此函數來重新創建情況。 代碼如下。這裏t是每個任務花費的秒數。 activeCount是當前正在運行的任務的數量,其餘是仍然保留在列表中的任務數量。

var t = new List<int>() 
{2,2,2,1,1,1,1,1,1,1, 
1,1,1,1,1,3,1,1,1,1, 
1,1,1,1,1,1,1,1,5,4, 
26,12,11,16,44,4,37,26,13,36}; 
int activeCount = 0; 
int remaining = t.Count; 
Parallel.ForEach(t, new ParallelOptions() {MaxDegreeOfParallelism = 4}, 
    (x) => 
    { 
     Console.WriteLine($"Active={Interlocked.Increment(ref activeCount)}"+ 
      $"Remaining={Interlocked.Decrement(ref remaining)} " + 
      $"Run thread={Thread.CurrentThread.ManagedThreadId}"); 
     Thread.Sleep(x * 1000); //Sleep x seconds 
     Interlocked.Decrement(ref activeCount); 
    }); 

在快結束它產生的輸出是這樣的:

Active=2 Remaining=7 Run thread=3 
Active=1 Remaining=6 Run thread=3 
Active=1 Remaining=5 Run thread=3 
Active=1 Remaining=4 Run thread=3 
Active=1 Remaining=3 Run thread=3 
Active=1 Remaining=2 Run thread=3 
Active=1 Remaining=1 Run thread=3 
Active=1 Remaining=0 Run thread=3 

此輸出顯示,在年底的時候6個任務仍然只有1個任務運行。由於4個並行任務的限制,它沒有任何意義。當6個任務仍然可用時,我希望看到4個任務並行運行。

我應該不同地使用Parallel.ForEach或者它是一個錯誤/功能?

+2

'RunOne()'中的代碼是什麼? –

+0

RunOne調用外部進程並等待它完成。 –

+1

你已經顯示的代碼是100%健壯的並且正常工作。你沒有顯示的代碼 - 「RunOne(...)' - 可能在這裏出錯;你可以請出示這種方法嗎? – Enigmativity

回答

0

在查看了Parallel.ForEach的參考資源之後,我發現不是將元素逐一分配給不同的線程,而是將任務列表分割成塊,然後將任務列表提供給每個線程。它是長期運行的任務

 var t = new List<int>() 
      {2,2,2,1,1,1,1,1,1,1, 
      1,1,1,1,1,3,1,1,1,1, 
      1,1,1,1,1,1,1,1,5,4, 
      26,12,11,16,44,4,37,26,13,36}; 
     int activeCount = 0; 
     int remaining = t.Count; 
     var cq = new ConcurrentQueue<int>(t); 
     var tasks = new List<Task>(); 
     for (int i = 0; i < 4; i++) tasks.Add(Task.Factory.StartNew(() => 
     { 
      int x; 
      while (cq.TryDequeue(out x)) 
      { 
       Console.WriteLine($"Active={Interlocked.Increment(ref activeCount)} " + 
        $"Remaining={Interlocked.Decrement(ref remaining)} " + 
        $"Run thread={Thread.CurrentThread.ManagedThreadId}"); 
       Thread.Sleep(x * 1000); //Sleep x seconds 
       Interlocked.Decrement(ref activeCount); 
      } 
     })); 
     Task.WaitAll(tasks.ToArray()); 

我用4個並行任務如在第一代碼例如是非常低效的方法。在使用Parallel.ForEach花費211秒時,在這種情況下執行時間爲83秒。這只是證明了Parallel.ForEach在某些情況下非常低效,應謹慎使用。

+0

*我發現,不是將元素逐個分發給不同的線程,而是將任務列表分成塊和然後給每個線程的任務列表*有​​一個'Parallel.ForEach'的重載使用你提供的分區器,你可以分割任務,但是你想要,例如https://msdn.microsoft.com/en-us/library/dd381768(v=vs.110).aspx#Examples –

+0

*對於長時間運行的任務,這是非常低效的方法*還有'Task.LongRunning'提示https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskcreationoptions(v=vs.110).aspx –

+0

任務,LongRunning提示與Parallel.ForEach無關。分區程序會工作我猜,但你可能是世界上知道它的兩個人之一:)我也不理解默認分區實現背後的邏輯。通用方法的假設太多了。 –

相關問題