2011-05-14 226 views
1

我試圖用一組資源實現生產者/消費者模式,因此每個線程都有一個與之相關的資源。例如,我可能有一個任務隊列,其中每個任務需要StreamWriter來編寫其結果。每個任務還​​必須傳遞給它的參數。生產者 - 消費者使用資源

我從Joseph Albahari的實現開始(參見下面的修改版本)。

我取代的Action隊列與Action<T>一個隊列,其中T是資源,並通過與螺紋到Action相關聯的資源。但是,這給我帶來了如何將參數傳遞給Action的問題。顯然,Action必須替換爲一個委託,但這會導致在任務排隊時如何傳遞參數(來自ProducerConsumerQueue類之外)的問題。任何想法如何做到這一點?

class ProducerConsumerQueue<T> 
    { 
     readonly object _locker = new object();    
     Thread[] _workers; 
     Queue<Action<T>> _itemQ = new Queue<Action<T>>(); 

     public ProducerConsumerQueue(T[] resources) 
     { 
      _workers = new Thread[resources.Length]; 

      // Create and start a separate thread for each worker 
      for (int i = 0; i < resources.Length; i++) 
      { 
       Thread thread = new Thread(() => Consume(resources[i])); 
       thread.SetApartmentState(ApartmentState.STA); 
       _workers[i] = thread; 
       _workers[i].Start(); 
      } 
     }   

     public void Shutdown(bool waitForWorkers) 
     { 
      // Enqueue one null item per worker to make each exit. 
      foreach (Thread worker in _workers) 
       EnqueueItem(null); 

      // Wait for workers to finish 
      if (waitForWorkers) 
       foreach (Thread worker in _workers) 
        worker.Join(); 
     } 

     public void EnqueueItem(Action<T> item) 
     { 
      lock (_locker) 
      { 
       _itemQ.Enqueue(item);   // We must pulse because we're 
       Monitor.Pulse(_locker);   // changing a blocking condition. 
      } 
     } 

     void Consume(T parameter) 
     { 
      while (true)      // Keep consuming until 
      {         // told otherwise. 
       Action<T> item; 
       lock (_locker) 
       { 
        while (_itemQ.Count == 0) Monitor.Wait(_locker); 
        item = _itemQ.Dequeue(); 
       } 
       if (item == null) return;   // This signals our exit. 
       item(parameter);       // Execute item. 
      } 
     } 
    } 

回答

2

類型TProducerConsumerQueue<T>並不一定是你的資源也可以是包含您的資源的複合型。使用.NET4最簡單的方法是使用Tuple<StreamWriter, YourParameterType>。產品/消費者隊列只是吃掉並吐出T,因此在Action<T>中,您只需使用屬性即可獲取資源和參數。如果您使用的是Tuple,則可以使用Item1獲取資源,使用Item2獲取參數。

如果不使用.NET4,這個過程是相似的,但你剛剛創建自己的類:

public class WorkItem<T> 
{ 
    private StreamWriter resource; 
    private T parameter; 

    public WorkItem(StreamWriter resource, T parameter) 
    { 
     this.resource = resource; 
     this.parameter = parameter; 
    } 

    public StreamWriter Resource { get { return resource; } } 
    public T Parameter { get { return parameter; } } 
} 

事實上,使其成爲通用的可超安全標準設計您的具體情況。您可以將T定義爲您想要的類型。

此外,作爲參考,還有一些新的方法可用於.NET4中包含的多線程,這些方法可能適用於您的用例,例如併發隊列和並行任務庫。它們也可以與信號量等傳統方法結合使用。

編輯:

這種方法繼續,這裏是一個演示使用一個小樣本類:

  • 信號量來控制訪問有限的資源
  • 併發隊列管理該線程之間的資源安全
  • 使用任務並行庫的任務管理

這裏是Processor類:

public class Processor 
{ 
    private const int count = 3; 
    private ConcurrentQueue<StreamWriter> queue = new ConcurrentQueue<StreamWriter>(); 
    private Semaphore semaphore = new Semaphore(count, count); 

    public Processor() 
    { 
     // Populate the resource queue. 
     for (int i = 0; i < count; i++) queue.Enqueue(new StreamWriter("sample" + i)); 
    } 

    public void Process(int parameter) 
    { 
     // Wait for one of our resources to become free. 
     semaphore.WaitOne(); 
     StreamWriter resource; 
     queue.TryDequeue(out resource); 

     // Dispatch the work to a task. 
     Task.Factory.StartNew(() => Process(resource, parameter)); 
    } 

    private Random random = new Random(); 

    private void Process(StreamWriter resource, int parameter) 
    { 
     // Do work in background with resource. 
     Thread.Sleep(random.Next(10) * 100); 
     resource.WriteLine("Parameter = {0}", parameter); 
     queue.Enqueue(resource); 
     semaphore.Release(); 
    } 
} 

,現在我們可以使用這樣的類:

var processor = new Processor(); 
for (int i = 0; i < 10; i++) 
    processor.Process(i); 

和不超過三個任務將在同一時間進行安排,各自有各自擁有StreamWriter資源被回收。

+0

問題是我不想爲每個任務提供自己的StreamWriter。我希望每個StreamWriter都屬於一個線程,它將在執行任務時重用它。 – Johnny 2011-05-14 20:44:06