2011-11-16 130 views
3

在.net 2.0上工作我需要實現一些線程,我正在查看一個虛擬示例,但找不到實現事件通知的任何內容。 需要知道什麼時候完成,還有某種進度條,如果你喜歡。生產者消費者模式你如何通知完成?

我一直在玩下面的代碼通過看不到正確的事件通知。 我如何檢測到我已完成處理並可能使用我一直在做的工作來更新ui?

實施例代碼

class Program 
{ 
    static void Main(string[] args) 
    { 
     using (PCQueue q = new PCQueue(2)) 
     { 
      q.TaskCompleted += new EventHandler(OnTaskCompleted); 
      q.PercentageCompleted += new EventHandler(OnPercentageCompleted); 


      for (int i = 1; i < 100; i++) 
      { 

       string itemNumber = i.ToString(); // To avoid the captured variable trap 
       q.EnqueueItem(itemNumber); 
      } 
      Console.WriteLine("Waiting for items to complete..."); 
      Console.Read(); 
     } 
    } 

    private static void OnPercentageCompleted(object sender, EventArgs e) 
    { 

    } 

    static void OnTaskCompleted(object sender, EventArgs e) 
    { 

    } 
} 


public class PCQueue : IDisposable 
{ 
    readonly object locker = new object(); 
    Thread[] _workers; 
    Queue<string> _itemQ = new Queue<string>(); 
    public PCQueue(int workerCount) 
    { 
     _workers = new Thread[workerCount]; 
     // Create and start a separate thread for each worker 
     for (int i = 0; i < workerCount; i++) 
     { 
      (_workers[i] = new Thread(Consume)).Start(); 
     } 
    } 

    public void EnqueueItem(string item) 
    { 
     lock (locker) 
     { 
      _itemQ.Enqueue(item); // We must pulse because we're 
      Monitor.Pulse(locker); // changing a blocking condition. 
     } 
    } 
    void Consume() 
    { 
     while (true) // Keep consuming until 
     { // told otherwise. 
      string item; 
      lock (locker) 
      { 
       while (_itemQ.Count == 0) Monitor.Wait(locker); 
       item = _itemQ.Dequeue(); 
      } 
      if (item == null) return; // This signals our exit. 

      DoSomething(item); // Execute item. 
     } 
    } 
    private void DoSomething(string item) 
    { 
     Console.WriteLine(item); 
    } 
    public void Dispose() 
    { 
     // Enqueue one null item per worker to make each exit. 
     foreach (Thread worker in _workers) 
     { 
      EnqueueItem(null); 
     } 
    } 

    //where/how can I fire this event??? 
    public event EventHandler TaskCompleted; 
    protected void OnCompleted(EventArgs e) 
    { 
     if (this.TaskCompleted != null) 
     { 
      this.TaskCompleted(this, e); 
     } 
    } 
    //where/how can I fire this event??? 
    public event EventHandler PercentageCompleted; 
    protected void OnPercentageCompleted(EventArgs e) 
    { 
     if (this.PercentageCompleted != null) 
     { 
      this.PercentageCompleted(this, e); 
     } 
    } 
    } 

任何建議?

+0

這應該給你一些想法:http://msdn.microsoft.com/en-us/library/system.threading.monitor.pulse(v=VS.80).aspx –

+0

感謝您的鏈接。對不起,但看不到任何事件發生。 – user9969

+0

啊,你特別想要事件。我沒有看到任何會阻止你處理q.WhateverEvent假設被解僱的事情。你可以發佈你的PCQueue的代碼嗎? –

回答

2

您不能在隊列中引發進度事件,原因很簡單,因爲隊列不知道應該處理的總數項目。所以它不能計算一個百分比。你只需要插入一些東西,然後進行處理。

你能做什麼就是引發一個ItemProcessed事件並訂閱它。然後在您的主程序中,您可以執行邏輯,計算到目前爲止處理了多少項目,以及應處理的項目數量。

您可以在從Consume函數返回之前提高完整事件。然而,當Brian在回答中說,你需要跟蹤有多少線程仍然活動。我修改了代碼以反映這一點。

因此就沿着這些線路:

... 
private int _ActiveThreads; 
public PCQueue(int workerCount) 
{ 
    _ActiveThreads = workerCount; 
    _workers = new Thread[workerCount]; 
    // Create and start a separate thread for each worker 
    for (int i = 0; i < workerCount; i++) 
    { 
     (_workers[i] = new Thread(Consume)).Start(); 
    } 
} 

void Consume() 
{ 
    while (true) // Keep consuming until 
    { // told otherwise. 
     string item; 
     lock (locker) 
     { 
      while (_itemQ.Count == 0) Monitor.Wait(locker); 
      item = _itemQ.Dequeue(); 
     } 
     if (item == null) // This signals our exit. 
     { 
      if (Interlocked.Decrement(ref _ActiveThreads) == 0) 
      { 
       OnCompleted(EventArgs.Empty); 
      } 
      return; 
     } 

     DoSomething(item); // Execute item. 
     OnItemProcessed(); 
    } 
} 

public event EventHandler ItemProcessed; 
protected void OnItemProcessed() 
{ 
    var handler = ItemProcessed; 
    if (handler != null) 
    { 
     handler(this, EventArgs.Empty); 
    } 
} 
... 

當然,你可能需要創建一些meaningfull事件參數實際上通過其處理該事件的項目。

然後在主:

... 
static void Main(string[] args) 
{ 
    using (PCQueue q = new PCQueue(2)) 
    { 
     q.ItemProcessed += ItemProcessed; 
     q.TaskCompleted += OnTaskCompleted; 

     for (int i = 1; i <= totalNumberOfItems; i++) 
     { 
      string itemNumber = i.ToString(); // To avoid the captured variable trap 
      q.EnqueueItem(itemNumber); 
     } 
     Console.WriteLine("Waiting for items to complete..."); 
     Console.Read(); 
    } 
} 

private static int currentProcessCount = 0; 
private static int totalNumberOfItems = 100; 

private static void ItemProcessed(object sender, EventArgs e) 
{ 
    currentProcessCount++; 
    Console.WriteLine("Progress: {0}%", ((double)currentProcessCount/(double)totalNumberOfItems) * 100.0); 
} 

static void OnTaskCompleted(object sender, EventArgs e) 
{ 
    Console.WriteLine("Done"); 
} 
... 

不用說,所有的靜態的東西應該去。這只是基於你的例子。

還有一個備註: 你PCQueue目前需要你有工作線程,否則只有一個線程將退出,而其他人將等到進程退出,你排隊儘可能多的null值。您可以通過查看第一個項目來更改該項目,並且只有在不是null時纔將其刪除 - 從而將標記留在所有線程中。所以Consume就改成這樣:

void Consume() 
{ 
    while (true) // Keep consuming until 
    { // told otherwise. 
     string item; 
     lock (locker) 
     { 
      while (_itemQ.Count == 0) Monitor.Wait(locker); 
      item = _itemQ.Peek(); 
      if (item != null) _itemQ.Dequeue(); 
      else Monitor.PulseAll(); // if the head of the queue is null then make sure all other threads are also woken up so they can quit 
     } 
     if (item == null) // This signals our exit. 
     { 
      if (Interlocked.Decrement(ref _ActiveThreads) == 0) 
      { 
       OnCompleted(EventArgs.Empty); 
      } 
      return; 
     } 

     DoSomething(item); // Execute item. 
     OnItemProcessed(); 
    } 
} 
+0

任何示例?你說的聽起來不錯 – user9969

+0

@ user231465:更新了答案。 – ChrisWue

+0

非常感謝您的時間和答覆。我得到我的控制檯我得到所有「0」和所有「100」。可能是我錯過了point.My想要的結果是「1/10」「2/10」3/10等等......然後10/10如果我們正在處理10個項目,則全部完成 – user9969

2

在你PCQueue類,你將需要保持的許多工作線程如何仍然活躍的軌道,提高TaskCompleted只有在所有的線程已經指示結束。

void Consume() 
{ 
    while (true) 
    { 
     string item; 
     lock (locker) 
     { 
      while (_itemQ.Count == 0) Monitor.Wait(locker); 
      item = _itemQ.Dequeue(); 
     } 

     if (item == null) 
     { 
      // activeThreads is set to the number of workers in the constructor. 
      if (Interlocked.Decrement(ref activeThreads) == 0) 
      { 
       // Take a snapshot of the event so that a null check + invocation is safe. 
       // This works because delegates are immutable. 
       var copy = TaskCompleted; 

       if (copy != null) 
       { 
       copy(this, new EventArgs()); 
       } 
      } 
      return; 
     } 

     DoSomething(item); // Execute item. 
    } 
} 

幾個其他點:

  • 榮譽的獲得正確實施阻塞隊列。大多數人都錯了。
  • 請記住在觸摸任何UI控件之前將TaskCompleted事件處理程序重新編排回UI線程。
  • 您可以從DoSomething提高PercentCompleted,但是沒有清楚地表明隊列中有多少項應該保留,這個值是沒有意義的。我在這一點上第二Chris' recommendation
+0

試圖用activeThreads實現你的「消費」版本,但沒有運氣。使用Chris示例。我在哪裏聲明ref activeThreads? – user9969

+0

@ user231465:在'PCQueue'中聲明'activeThreads'作爲私有變量,然後在構造函數中執行'activeThreads = workerCount'。這個想法是'Interlocked.Decrement'與'if'檢查將導致'TaskCompleted'在最後一個工作線程完成後被引發。 –

+0

提及UI問題。關於保持活動線程計數的好主意 - 錯過了。 – ChrisWue