2013-02-27 95 views
2

這進一步是我的問題here.NET 2.0處理線程池使用

通過做一些閱讀非常大名單....我搬到遠離信號燈來線程池。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 

namespace ThreadPoolTest 
{ 
    class Data 
    { 
     public int Pos { get; set; } 
     public int Num { get; set; } 
    } 

    class Program 
    { 
     static ManualResetEvent[] resetEvents = new ManualResetEvent[20]; 

     static void Main(string[] args) 
     {    

      int s = 0; 
      for (int i = 0; i < 100000; i++) 
      {     
       resetEvents[s] = new ManualResetEvent(false); 
       Data d = new Data(); 
       d.Pos = s; 
       d.Num = i; 
       ThreadPool.QueueUserWorkItem(new WaitCallback(Process), (object)d); 
       if (s >= 19) 
       { 
        WaitHandle.WaitAll(resetEvents); 
        Console.WriteLine("Press Enter to Move forward"); 
        Console.ReadLine(); 
        s = 0; 
       } 
       else 
       { 
        s = s + 1; 
       } 
      } 
     } 

     private static void Process(object o) 
     { 
      Data d = (Data) o; 
      Console.WriteLine(d.Num.ToString()); 
      Thread.Sleep(10000); 
      resetEvents[d.Pos].Set(); 
     } 
    } 
} 

此代碼的工作原理和我能夠處理的20集。但我不喜歡這個代碼,因爲WaitAll。假設我開始一批20個,3個線程需要更長的時間,而17個完成。即使這樣,我仍會保持17個線程等待,因爲WaitAll。

WaitAny本來是不錯的......但是爲了有效地使用池,我必須構建如堆棧,列表,隊列等控制結構似乎相當麻煩。

我不喜歡的另一件事是整個全局變量在類的resetEvents。因爲這個數組必須在Process方法和主循環之間共享。

上面的代碼工作...但我需要你的幫助來改進它。

再次......我在.NET 2.0 VS 2008.我不能使用.NET 4.0並行/異步框架。

回答

3

有幾種方法可以做到這一點。也許最簡單的,根據你上面貼的東西,應該是:

const int MaxThreads = 4; 
const int ItemsToProcess = 10000; 
private Semaphore _sem = new Semaphore(MaxThreads, MaxThreads); 

void DoTheWork() 
{ 
    int s = 0; 
    for (int i = 0; i < ItemsToProcess; ++i) 
    { 
     _sem.WaitOne(); 
     Data d = new Data(); 
     d.Pos = s; 
     d.Num = i; 
     ThreadPool.QueueUserWorkItem(Process, d); 
     ++s; 
     if (s >= 19) 
      s = 0; 
    } 

    // All items have been assigned threads. 
    // Now, acquire the semaphore "MaxThreads" times. 
    // When counter reaches that number, we know all threads are done. 
    int semCount = 0; 
    while (semCount < MaxThreads) 
    { 
     _sem.WaitOne(); 
     ++semCount; 
    } 
    // All items are processed 

    // Clear the semaphore for next time. 
    _sem.Release(semCount); 
} 

void Process(object o) 
{ 
    // do the processing ... 

    // release the semaphore 
    _sem.Release(); 
} 

我只用了四個線程在我的例子,因爲這是我有多少個核都有。使用20個線程是沒有意義的,但其中只有四個線程可以在任何時間處理。但如果你願意,你可以自由增加MaxThreads號碼。

+1

我敢肯定,我做錯了什麼,但我的代碼總是暫停無限期地在第26行(_sem.WaiteOne();)。 我添加了一個計數器來檢查while(semCount = MaxThreads)return;在第25行。 這似乎工作,但感覺像一個黑客,我想了解爲什麼你的解決方案不適用於我... – GojiraDeMonstah 2013-10-31 14:16:21

+0

@GojiraDeMonstah:你的修改是正確的。我的代碼有一個錯誤。我在我的例子中修改了這個循環。感謝您爲我找到。 – 2013-10-31 14:25:35

2

所以我很確定這都是.NET 2.0。

我們將開始定義Action,因爲我很習慣使用它。如果在3.5+中使用此解決方案,請刪除該定義。

接下來,我們根據輸入創建一個動作隊列。

之後,我們定義一個回調;這種回調是方法的肉。

它首先抓取隊列中的下一個項目(使用鎖定,因爲隊列不是線程安全的)。如果它最終有一個物品可以抓住它執行該物品。接下來,它將一個新項目添加到「本身」的線程池中。這是一個遞歸的匿名方法(你不經常會遇到這種情況)。這意味着當第一次調用回調函數時,它將執行一個項目,然後計劃將執行另一個項目的任務,並且該項目將計劃執行另一個項目的任務,依此類推。最終隊列將耗盡,他們將停止排隊更多項目。

我們也希望方法能夠阻止,直到我們全部完成,因此我們通過遞增計數器來跟蹤這些回調已完成的數量。當該計數器達到任務限制時,我們發信號通知事件。

最後我們在線程池中啓動N個回調函數。

public delegate void Action(); 
public static void Execute(IEnumerable<Action> actions, int maxConcurrentItems) 
{ 
    object key = new object(); 
    Queue<Action> queue = new Queue<Action>(actions); 
    int count = 0; 
    AutoResetEvent whenDone = new AutoResetEvent(false); 

    WaitCallback callback = null; 
    callback = delegate 
    { 
     Action action = null; 
     lock (key) 
     { 
      if (queue.Count > 0) 
       action = queue.Dequeue(); 
     } 
     if (action != null) 
     { 
      action(); 
      ThreadPool.QueueUserWorkItem(callback); 
     } 
     else 
     { 
      if (Interlocked.Increment(ref count) == maxConcurrentItems) 
       whenDone.Set(); 
     } 

    }; 

    for (int i = 0; i < maxConcurrentItems; i++) 
    { 
     ThreadPool.QueueUserWorkItem(callback); 
    } 

    whenDone.WaitOne(); 
} 

下面是不使用線程池的另一種選擇,只是使用的線程固定數量:

public static void Execute(IEnumerable<Action> actions, int maxConcurrentItems) 
{ 
    Thread[] threads = new Thread[maxConcurrentItems]; 
    object key = new object(); 
    Queue<Action> queue = new Queue<Action>(actions); 
    for (int i = 0; i < maxConcurrentItems; i++) 
    { 
     threads[i] = new Thread(new ThreadStart(delegate 
     { 
      Action action = null; 
      do 
      { 
       lock (key) 
       { 
        if (queue.Count > 0) 
         action = queue.Dequeue(); 
        else 
         action = null; 
       } 
       if (action != null) 
       { 
        action(); 
       } 
      } while (action != null); 
     })); 
     threads[i].Start(); 
    } 

    for (int i = 0; i < maxConcurrentItems; i++) 
    { 
     threads[i].Join(); 
    } 
} 
+0

有趣的想法,但一些錯誤。第一個例子不會處理超過MaxConcurrentItems項目,因爲線程回調不會循環。在第二項中,線程將永遠持續下去,因爲在循環中'action'永遠不會設置爲'null'。 – 2013-02-28 12:19:13

+0

@JimMischel第一個選項不是*應該*循環。它是遞歸的,挺好的。它從自身內部自我調用,第二個調用將調用另一個,另一個等,直到隊列爲空,這是它處理所有項目的方式。第二個確實有一個小錯誤,這是正確的,我現在已經修復了這個問題。 – Servy 2013-02-28 14:54:46

+0

啊......現在我明白了。我沒有看到對'QueueUserWorkItem'的嵌入式調用。謝謝。 – 2013-02-28 15:33:50