這一切都發生在Windows服務中。一次只能同時使用任務並行庫處理n個項目
我有一個Queue<T>
(實際上是一個ConcurrentQueue<T>
)持有物品等待處理。但是,我不想一次只處理一個,我想同時處理n個項目,其中n是一個可配置的整數。
如何使用任務並行庫執行此操作?
我知道TPL會代表開發人員爲併發處理分割集合,但不知道這是我之後的功能。我是多線程和TPL的新手。
這一切都發生在Windows服務中。一次只能同時使用任務並行庫處理n個項目
我有一個Queue<T>
(實際上是一個ConcurrentQueue<T>
)持有物品等待處理。但是,我不想一次只處理一個,我想同時處理n個項目,其中n是一個可配置的整數。
如何使用任務並行庫執行此操作?
我知道TPL會代表開發人員爲併發處理分割集合,但不知道這是我之後的功能。我是多線程和TPL的新手。
使用BlockingCollection<T>
而不是ConcurrentQueue<T>
,那麼您可以啓動任意數量的消費者線程並使用BlockingCollection
的Take
方法。如果集合爲空,則Take
方法將自動阻止調用方線程等待要添加的項目,否則線程將並行使用所有隊列項目。然而,正如你的問題中提到的使用TPL,事實證明Parallel.ForEach
在使用BlockingCollection
檢查this後有更多詳細信息。所以你必須管理你的消費者創建你自己的線程。 new Thread(/*consumer method*/)
或new Task()
...
這裏有一個想法,涉及創建一個TaskFactory
的擴展方法。
public static class TaskFactoryExtension
{
public static Task StartNew(this TaskFactory target, Action action, int parallelism)
{
var tasks = new Task[parallelism];
for (int i = 0; i < parallelism; i++)
{
tasks[i] = target.StartNew(action);
}
return target.StartNew(() => Task.WaitAll(tasks));
}
}
然後您的調用代碼如下所示。
ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
var task = Task.Factory.StartNew(
() =>
{
T item;
while (queue.TryDequeue(out item))
{
ProcessItem(item);
}
}, n);
task.Wait(); // Optionally wait for everything to finish.
這是使用Parallel.ForEach
的另一個想法。這種方法的問題在於你的並行度可能不一定會得到滿足。您只是表示允許的最大金額,而不是絕對金額。
ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n },
(item) =>
{
ProcessItem(item);
});
我也建議使用BlockingCollection
而不是直接使用ConcurrentQueue
的。
下面是一個例子:
public class QueuingRequestProcessor
{
private BlockingCollection<MyRequestType> queue;
public void QueuingRequestProcessor(int maxConcurrent)
{
this.queue = new BlockingCollection<MyRequestType>(maxConcurrent);
Task[] consumers = new Task[maxConcurrent];
for (int i = 0; i < maxConcurrent; i++)
{
consumers[i] = Task.Factory.StartNew(() =>
{
// Will wait when queue is empty, until CompleteAdding() is called
foreach (var request in this.queue.GetConsumingEnumerable())
{
Process(request);
}
});
}
}
public void Add(MyRequest request)
{
this.queue.Add(request);
}
public void Stop()
{
this.queue.CompleteAdding();
}
private void Process(MyRequestType request)
{
// Do your processing here
}
}
注意maxConcurrent
在構造函數中定義了多少個請求將被同時處理。
BlockingCollection失敗了隊列的用途。迭代時,我無法從阻塞集合中刪除項目。 –
不,您可以通過使用[GetConsumingEnumerable](http://msdn.microsoft.com/en-us/library/dd287186.aspx)來代替。就像'foreach(_collection.GetConsumingEnumerable()中的item item)',如果集合是空的,它也會阻塞等待項目被添加。 –