2009-01-12 137 views
55

尋找簡單線程池實現的一些示例代碼(C#)。C#中簡單線程池的代碼

我在codeproject上找到了一個,但代碼庫非常龐大,我不需要所有的功能。

無論如何,這是更多用於教育目的。

+4

簡短的回答是,除非是學習練習,否則你不應該推出自己的產品。如果這是一個學習練習,你會通過自己寫,而不是複製別人的代碼來學習更多。 :) – 2009-01-12 15:29:50

+2

@Greg:是否有任何情況下您可能想要一個獨立於現有標準ThreadPool的線程池? – AnthonyWJones 2009-01-12 15:32:51

+1

@Anthony:讀取Joe Duffy(和其他人)各種帖子中的內置線程池的內容,我相當有信心,任何我一起打的線程池將比已經存在的線程池弱得多。 – 2009-01-12 17:59:53

回答

28

沒有必要實現你自己的,因爲它不是很難使用現有的.NET實現。

http://msdn.microsoft.com/en-us/library/3dasc8as(VS.80).aspx

using System; 
using System.Threading; 

public class Fibonacci 
{ 
    public Fibonacci(int n, ManualResetEvent doneEvent) 
    { 
     _n = n; 
     _doneEvent = doneEvent; 
    } 

    // Wrapper method for use with thread pool. 
    public void ThreadPoolCallback(Object threadContext) 
    { 
     int threadIndex = (int)threadContext; 
     Console.WriteLine("thread {0} started...", threadIndex); 
     _fibOfN = Calculate(_n); 
     Console.WriteLine("thread {0} result calculated...", threadIndex); 
     _doneEvent.Set(); 
    } 

    // Recursive method that calculates the Nth Fibonacci number. 
    public int Calculate(int n) 
    { 
     if (n <= 1) 
     { 
      return n; 
     } 

     return Calculate(n - 1) + Calculate(n - 2); 
    } 

    public int N { get { return _n; } } 
    private int _n; 

    public int FibOfN { get { return _fibOfN; } } 
    private int _fibOfN; 

    private ManualResetEvent _doneEvent; 
} 

public class ThreadPoolExample 
{ 
    static void Main() 
    { 
     const int FibonacciCalculations = 10; 

     // One event is used for each Fibonacci object 
     ManualResetEvent[] doneEvents = new ManualResetEvent[FibonacciCalculations]; 
     Fibonacci[] fibArray = new Fibonacci[FibonacciCalculations]; 
     Random r = new Random(); 

     // Configure and launch threads using ThreadPool: 
     Console.WriteLine("launching {0} tasks...", FibonacciCalculations); 
     for (int i = 0; i < FibonacciCalculations; i++) 
     { 
      doneEvents[i] = new ManualResetEvent(false); 
      Fibonacci f = new Fibonacci(r.Next(20,40), doneEvents[i]); 
      fibArray[i] = f; 
      ThreadPool.QueueUserWorkItem(f.ThreadPoolCallback, i); 
     } 

     // Wait for all threads in pool to calculation... 
     WaitHandle.WaitAll(doneEvents); 
     Console.WriteLine("All calculations are complete."); 

     // Display the results... 
     for (int i= 0; i<FibonacciCalculations; i++) 
     { 
      Fibonacci f = fibArray[i]; 
      Console.WriteLine("Fibonacci({0}) = {1}", f.N, f.FibOfN); 
     } 
    } 
} 
47

這是最簡單的,天真的,線程池用於教育目的的實現,我可以想出(C#/。NET 3.5)。它沒有以任何方式使用.NET的線程池實現。

using System; 
using System.Collections.Generic; 
using System.Threading; 

namespace SimpleThreadPool 
{ 
    public sealed class Pool : IDisposable 
    { 
     public Pool(int size) 
     { 
      this._workers = new LinkedList<Thread>(); 
      for (var i = 0; i < size; ++i) 
      { 
       var worker = new Thread(this.Worker) { Name = string.Concat("Worker ", i) }; 
       worker.Start(); 
       this._workers.AddLast(worker); 
      } 
     } 

     public void Dispose() 
     { 
      var waitForThreads = false; 
      lock (this._tasks) 
      { 
       if (!this._disposed) 
       { 
        GC.SuppressFinalize(this); 

        this._disallowAdd = true; // wait for all tasks to finish processing while not allowing any more new tasks 
        while (this._tasks.Count > 0) 
        { 
         Monitor.Wait(this._tasks); 
        } 

        this._disposed = true; 
        Monitor.PulseAll(this._tasks); // wake all workers (none of them will be active at this point; disposed flag will cause then to finish so that we can join them) 
        waitForThreads = true; 
       } 
      } 
      if (waitForThreads) 
      { 
       foreach (var worker in this._workers) 
       { 
        worker.Join(); 
       } 
      } 
     } 

     public void QueueTask(Action task) 
     { 
      lock (this._tasks) 
      { 
       if (this._disallowAdd) { throw new InvalidOperationException("This Pool instance is in the process of being disposed, can't add anymore"); } 
       if (this._disposed) { throw new ObjectDisposedException("This Pool instance has already been disposed"); } 
       this._tasks.AddLast(task); 
       Monitor.PulseAll(this._tasks); // pulse because tasks count changed 
      } 
     } 

     private void Worker() 
     { 
      Action task = null; 
      while (true) // loop until threadpool is disposed 
      { 
       lock (this._tasks) // finding a task needs to be atomic 
       { 
        while (true) // wait for our turn in _workers queue and an available task 
        { 
         if (this._disposed) 
         { 
          return; 
         } 
         if (null != this._workers.First && object.ReferenceEquals(Thread.CurrentThread, this._workers.First.Value) && this._tasks.Count > 0) // we can only claim a task if its our turn (this worker thread is the first entry in _worker queue) and there is a task available 
         { 
          task = this._tasks.First.Value; 
          this._tasks.RemoveFirst(); 
          this._workers.RemoveFirst(); 
          Monitor.PulseAll(this._tasks); // pulse because current (First) worker changed (so that next available sleeping worker will pick up its task) 
          break; // we found a task to process, break out from the above 'while (true)' loop 
         } 
         Monitor.Wait(this._tasks); // go to sleep, either not our turn or no task to process 
        } 
       } 

       task(); // process the found task 
       lock(this._tasks) 
       { 
        this._workers.AddLast(Thread.CurrentThread); 
       } 
       task = null; 
      } 
     } 

     private readonly LinkedList<Thread> _workers; // queue of worker threads ready to process actions 
     private readonly LinkedList<Action> _tasks = new LinkedList<Action>(); // actions to be processed by worker threads 
     private bool _disallowAdd; // set to true when disposing queue but there are still tasks pending 
     private bool _disposed; // set to true when disposing queue and no more tasks are pending 
    } 


    public static class Program 
    { 
     static void Main() 
     { 
      using (var pool = new Pool(5)) 
      { 
       var random = new Random(); 
       Action<int> randomizer = (index => 
       { 
        Console.WriteLine("{0}: Working on index {1}", Thread.CurrentThread.Name, index); 
        Thread.Sleep(random.Next(20, 400)); 
        Console.WriteLine("{0}: Ending {1}", Thread.CurrentThread.Name, index); 
       }); 

       for (var i = 0; i < 40; ++i) 
       { 
        var i1 = i; 
        pool.QueueTask(() => randomizer(i1)); 
       } 
      } 
     } 
    } 
}