2016-04-12 38 views
10

我有一個1000個輸入消息的集合進行處理。我循環輸入集合並開始處理每個消息的新任務。如何限制c#中的最大並行任務數量

//Assume this messages collection contains 1000 items 
var messages = new List<string>(); 

foreach (var msg in messages) 
{ 
    Task.Factory.StartNew(() => 
    { 
    Process(msg); 
    }); 
} 

我們能猜出多少最大的消息同時在時間(假設正常的四核處理器)得到處理,或者我們可以在限制時間處理的消息的最大數量?

如何確保此消息按照集合的相同順序處理?

+0

如何將消息拆分爲批處理並且並行運行每個批處理? – bit

回答

7

SemaphoreSlim在這種情況下,一個很好的解決方案,我higly建議OP試試這個,但@ Manoj的回答存在缺陷,正如comments.semaphore應該在產生這樣的任務之前等待。

更新答:作爲@Vasyl指出信號燈可以完成任務之前設置,並會引發異常時發佈()方法被調用,所以之前退出使用塊必須等待所有創建的任務的完成。

int maxConcurrency=10; 
var messages = new List<string>(); 
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) 
{ 
    List<Task> tasks = new List<Task>(); 
    foreach(var msg in messages) 
    { 
     concurrencySemaphore.Wait(); 

     var t = Task.Factory.StartNew(() => 
     { 

      try 
      { 
       Process(msg); 
      } 
      finally 
      { 
       concurrencySemaphore.Release(); 
      } 
     }); 

     tasks.Add(t); 
    } 

    Task.WaitAll(tasks.ToArray()); 
} 
+0

如果'Process'方法會運行很長時間會怎麼樣? 'concurrencySemaphore.Release()'可能在'concurrencySemaphore'已經被處理時被調用。結果就是 - ObjectDisposedException。 –

+0

@VasylZvarydchuk你是對的。我已經更新了答案 – ClearLogic

1

你可以簡單的設置最大併發度像這樣:

int maxConcurrency=10; 
var messages = new List<1000>(); 
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) 
{ 
    foreach(var msg in messages) 
    { 
     Task.Factory.StartNew(() => 
     { 
      concurrencySemaphore.Wait(); 
      try 
      { 
       Process(msg); 
      } 
      finally 
      { 
       concurrencySemaphore.Release(); 
      } 
     }); 
    } 
} 
+1

這會不必要地阻塞線程,如果線程池的線程數多於最大併發數。 – yaakov

11

你可以使用Parallel.Foreach,依靠MaxDegreeOfParallelism代替。

Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10}, 
msg => 
{ 
    // logic 
    Process(msg); 
}); 
+1

這正是「Parallel.ForEach」的處理過程。 –

+0

由於任務並行庫是建立在'ThreadPool'上的,因此我們可以假設它只會運行與系統具有核心數量一樣多的任務,如果我們沒有明確指定的話。 – Toxantron

+0

這樣可以確保消息的處理順序與它們在列表中發生的順序相同嗎? – bit

1

覺得這是更好的用戶並行LINQ

Parallel.ForEach(messages , 
    new ParallelOptions{MaxDegreeOfParallelism = 4}, 
      x => Process(x); 
     ); 

其中x是最大並行度

0

如果您需要按順序排隊(處理可能按任意順序完成),則不需要信號量。老式的if語句工作正常:

 const int maxConcurrency = 5; 
     List<Task> tasks = new List<Task>(); 
     foreach (var arg in args) 
     { 
      var t = Task.Run(() => { Process(arg); }); 

      tasks.Add(t); 

      if(tasks.Count >= maxConcurrency) 
       Task.WaitAny(tasks.ToArray()); 
     } 

     Task.WaitAll(tasks.ToArray());