我試圖用一組資源實現生產者/消費者模式,因此每個線程都有一個與之相關的資源。例如,我可能有一個任務隊列,其中每個任務需要StreamWriter
來編寫其結果。每個任務還必須傳遞給它的參數。生產者 - 消費者使用資源
我從Joseph Albahari的實現開始(參見下面的修改版本)。
我取代的Action
隊列與Action<T>
一個隊列,其中T
是資源,並通過與螺紋到Action
相關聯的資源。但是,這給我帶來了如何將參數傳遞給Action
的問題。顯然,Action
必須替換爲一個委託,但這會導致在任務排隊時如何傳遞參數(來自ProducerConsumerQueue
類之外)的問題。任何想法如何做到這一點?
class ProducerConsumerQueue<T>
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action<T>> _itemQ = new Queue<Action<T>>();
public ProducerConsumerQueue(T[] resources)
{
_workers = new Thread[resources.Length];
// Create and start a separate thread for each worker
for (int i = 0; i < resources.Length; i++)
{
Thread thread = new Thread(() => Consume(resources[i]));
thread.SetApartmentState(ApartmentState.STA);
_workers[i] = thread;
_workers[i].Start();
}
}
public void Shutdown(bool waitForWorkers)
{
// Enqueue one null item per worker to make each exit.
foreach (Thread worker in _workers)
EnqueueItem(null);
// Wait for workers to finish
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem(Action<T> item)
{
lock (_locker)
{
_itemQ.Enqueue(item); // We must pulse because we're
Monitor.Pulse(_locker); // changing a blocking condition.
}
}
void Consume(T parameter)
{
while (true) // Keep consuming until
{ // told otherwise.
Action<T> item;
lock (_locker)
{
while (_itemQ.Count == 0) Monitor.Wait(_locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
item(parameter); // Execute item.
}
}
}
問題是我不想爲每個任務提供自己的StreamWriter。我希望每個StreamWriter都屬於一個線程,它將在執行任務時重用它。 – Johnny 2011-05-14 20:44:06