2016-12-25 59 views
0

我需要一個集合/包(東西),它將保存一些運行Task對象的最大數量。將新的運行Task添加到集合應該阻止調用線程(有很多線程試圖添加任務,因此它應該是線程安全的),直到有新的任務的可用插槽被添加,如果最大數量的正在運行的任務到達了。這是我迄今爲止所做的,並且工作正常。集合保持運行任務的最大數量

public class ConcurrentTaskLimiter 
{ 
    public int MaxWorkingTasks { get; } 
    private readonly Task[] _tasks; 
    private readonly bool[] _finished; 

    public ConcurrentTaskLimiter(int maxWorkingTasks) 
    { 
     MaxWorkingTasks = maxWorkingTasks; 
     if ((1 <= maxWorkingTasks) == false) 
      throw new ArgumentOutOfRangeException(nameof(maxWorkingTasks), maxWorkingTasks, "Must be >= 1"); 
     _tasks = new Task[maxWorkingTasks]; 
     _finished = new bool[maxWorkingTasks]; 

     for (int i = 0; i < MaxWorkingTasks; i++) 
     { 
      _tasks[i] = Task.FromResult(0); // use this as finished tasks 
      _finished[i] = true; 
     } 
    } 

    public void BlockAdd(Task t) 
    { 
     if (t == null) 
      throw new ArgumentNullException(nameof(t)); 

     if (t.Status == TaskStatus.Canceled 
      || t.Status == TaskStatus.Faulted 
      || t.Status == TaskStatus.RanToCompletion) 
      return; 

     lock (this) 
     { 
      int i; 
      while (true) 
      { 
       for (i = 0; i < MaxWorkingTasks; i++) 
       { 
        if (_finished[i]) 
        { 
         _tasks[i] = t; 
         _finished[i] = false; 
         return; 
        } 
       } 
       i = Task.WaitAny(_tasks); 
       _finished[i] = true; 
      } 
     } 
    } 
} 

你覺得這個代碼的任何問題嗎?還是有一些內置的類可以處理這種任務:)?

+0

如果您的代碼正常工作,請在codereview上發佈此信息,以明確說明問題所在。 –

+0

你應該看看[TPL DataFlow](https://msdn.microsoft.com/en-us/library/hh228603(v = vs.110).aspx),它可能有你所需要的。 –

回答

0

使用一個SynchronizedCollection<T>它會照顧線程安全爲您開箱。或者使用System.Collections.Concurrent的衆多系列之一。我會使用System.Collections.Concurrent之一,因爲它們更新更高效。

3

.NET有一個名爲BlockingCollection<T>的類,可以極大地簡化您的任務。

阻塞收集的實例可以初始化,它可以容納的任務數量的上限。當您這樣做時,添加任務會導致收集超出容量時,Add的呼叫將被阻止。

+0

我想到了BlockingCollection ,但在這種情況下,我該如何刪除/刪除完成的任務,以便爲新的運行任務騰出空間? –

0

我會提出更優雅的解決方案。

public class TasksPool 
    { 
     public int MaxSize { get; private set; } 

     public TasksPool(int maxSize) 
     { 
      if (maxSize < 1) throw new IndexOutOfRangeException("Should be 1 or more"); 
      MaxSize = maxSize; 
      _semaphore = new SemaphoreSlim(maxSize-1); 
     } 

     private readonly SemaphoreSlim _semaphore; 

     public void Add(Task t, CancellationToken token) 
     { 
      _semaphore.Wait(token); 

      if (token.IsCancellationRequested) return; 
      t.ContinueWith(q => _semaphore.Release(), token); 
     } 
    }