8

這一切都發生在Windows服務中。一次只能同時使用任務並行庫處理n個項目

我有一個Queue<T>(實際上是一個ConcurrentQueue<T>)持有物品等待處理。但是,我不想一次只處理一個,我想同時處理n個項目,其中n是一個可配置的整數。

如何使用任務並行庫執行此操作?

我知道TPL會代表開發人員爲併發處理分割集合,但不知道這是我之後的功能。我是多線程和TPL的新手。

回答

4

使用BlockingCollection<T>而不是ConcurrentQueue<T>,那麼您可以啓動任意數量的消費者線程並使用BlockingCollectionTake方法。如果集合爲空,則Take方法將自動阻止調用方線程等待要添加的項目,否則線程將並行使用所有隊列項目。然而,正如你的問題中提到的使用TPL,事實證明Parallel.ForEach在使用BlockingCollection檢查this後有更多詳細信息。所以你必須管理你的消費者創建你自己的線程。 new Thread(/*consumer method*/)new Task() ...

+0

BlockingCollection失敗了隊列的用途。迭代時,我無法從阻塞集合中刪除項目。 –

+2

不,您可以通過使用[GetConsumingEnumerable](http://msdn.microsoft.com/en-us/library/dd287186.aspx)來代替。就像'foreach(_collection.GetConsumingEnumerable()中的item item)',如果集合是空的,它也會阻塞等待項目被添加。 –

4

這裏有一個想法,涉及創建一個TaskFactory的擴展方法。

public static class TaskFactoryExtension 
{ 
    public static Task StartNew(this TaskFactory target, Action action, int parallelism) 
    { 
     var tasks = new Task[parallelism]; 
     for (int i = 0; i < parallelism; i++) 
     { 
      tasks[i] = target.StartNew(action); 
     } 
     return target.StartNew(() => Task.WaitAll(tasks)); 
    } 
} 

然後您的調用代碼如下所示。

ConcurrentQueue<T> queue = GetQueue(); 
int n = GetDegreeOfParallelism(); 
var task = Task.Factory.StartNew(
() => 
    { 
    T item; 
    while (queue.TryDequeue(out item)) 
    { 
     ProcessItem(item); 
    } 
    }, n); 
task.Wait(); // Optionally wait for everything to finish. 

這是使用Parallel.ForEach的另一個想法。這種方法的問題在於你的並行度可能不一定會得到滿足。您只是表示允許的最大金額,而不是絕對金額。

ConcurrentQueue<T> queue = GetQueue(); 
int n = GetDegreeOfParallelism(); 
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n }, 
    (item) => 
    { 
    ProcessItem(item);  
    }); 
1

我也建議使用BlockingCollection而不是直接使用ConcurrentQueue的。

下面是一個例子:

public class QueuingRequestProcessor 
{ 
    private BlockingCollection<MyRequestType> queue; 

    public void QueuingRequestProcessor(int maxConcurrent) 
    { 
    this.queue = new BlockingCollection<MyRequestType>(maxConcurrent); 

    Task[] consumers = new Task[maxConcurrent]; 

    for (int i = 0; i < maxConcurrent; i++) 
    { 
     consumers[i] = Task.Factory.StartNew(() => 
     { 
     // Will wait when queue is empty, until CompleteAdding() is called 
     foreach (var request in this.queue.GetConsumingEnumerable()) 
     { 
      Process(request); 
     } 
     }); 
    } 
    } 

    public void Add(MyRequest request) 
    { 
    this.queue.Add(request); 
    } 

    public void Stop() 
    { 
    this.queue.CompleteAdding(); 
    } 

    private void Process(MyRequestType request) 
    { 
    // Do your processing here 
    } 
} 

注意maxConcurrent在構造函數中定義了多少個請求將被同時處理。

相關問題