2009-04-09 44 views
12

如何在C#中使用事件和代表實現生產者/消費者模式?在使用這些設計模式時,我需要注意哪些問題?我需要注意哪些邊緣情況?在C#中實現生產者/消費者模式

+5

這是一個有效的問題國際海事組織。我不認爲把一個問題當作家庭作業是不禮貌的。 – Sung 2009-04-09 12:52:29

+5

@Sung Meister:我不能不同意你的意見! – 2009-04-09 21:07:41

+5

作業與否,這是一個常見問題,值得回答。 – mpen 2010-09-25 05:15:04

回答

1

我知道這個線程是很有點老了,但因爲我有時會在我的搜索碰到它,我決定分享這個生產者 - 消費者對人們想知道如何實現一個簡單通用的生產者 - 消費者代碼作業隊列。

作業類用於以代理的形式'存儲'對象的方法調用。然後在處理作業時調用該委託。任何相關參數也存儲在此Job類中。

有了這個簡單的模式,就有可能在入隊和出隊過程中實現多線程。其實這只是最簡單的部分:多線程給你的代碼帶來了新的挑戰,你會注意到它們以後;-)

我原來在thread上發佈了這段代碼。

using System; 
using System.Collections.Concurrent; 
using System.Diagnostics; 
using System.Threading; 

// Compiled and tested in: Visual Studio 2017, DotNET 4.6.1 

namespace MyNamespace 
{ 
    public class Program 
    { 
     public static void Main(string[] args) 
     { 
      MyApplication app = new MyApplication(); 
      app.Run(); 
     } 
    } 

    public class MyApplication 
    { 
     private BlockingCollection<Job> JobQueue = new BlockingCollection<Job>(); 
     private CancellationTokenSource JobCancellationTokenSource = new CancellationTokenSource(); 
     private CancellationToken JobCancellationToken; 
     private Timer Timer; 
     private Thread UserInputThread; 



     public void Run() 
     { 
      // Give a name to the main thread: 
      Thread.CurrentThread.Name = "Main"; 

      // Fires a Timer thread: 
      Timer = new Timer(new TimerCallback(TimerCallback), null, 1000, 2000); 

      // Fires a thread to read user inputs: 
      UserInputThread = new Thread(new ThreadStart(ReadUserInputs)) 
      { 
       Name = "UserInputs", 
       IsBackground = true 
      }; 
      UserInputThread.Start(); 

      // Prepares a token to cancel the job queue: 
      JobCancellationToken = JobCancellationTokenSource.Token; 

      // Start processing jobs: 
      ProcessJobs(); 

      // Clean up: 
      JobQueue.Dispose(); 
      Timer.Dispose(); 
      UserInputThread.Abort(); 

      Console.WriteLine("Done."); 
     } 



     private void ProcessJobs() 
     { 
      try 
      { 
       // Checks if the blocking collection is still up for dequeueing: 
       while (!JobQueue.IsCompleted) 
       { 
        // The following line blocks the thread until a job is available or throws an exception in case the token is cancelled: 
        JobQueue.Take(JobCancellationToken).Run(); 
       } 
      } 
      catch { } 
     } 



     private void ReadUserInputs() 
     { 
      // User input thread is running here. 
      ConsoleKey key = ConsoleKey.Enter; 

      // Reads user inputs and queue them for processing until the escape key is pressed: 
      while ((key = Console.ReadKey(true).Key) != ConsoleKey.Escape) 
      { 
       Job userInputJob = new Job("UserInput", this, new Action<ConsoleKey>(ProcessUserInputs), key); 
       JobQueue.Add(userInputJob); 
      } 
      // Stops processing the JobQueue: 
      JobCancellationTokenSource.Cancel(); 
     } 

     private void ProcessUserInputs(ConsoleKey key) 
     { 
      // Main thread is running here. 
      Console.WriteLine($"You just typed '{key}'. (Thread: {Thread.CurrentThread.Name})"); 
     } 



     private void TimerCallback(object param) 
     { 
      // Timer thread is running here. 
      Job job = new Job("TimerJob", this, new Action<string>(ProcessTimer), "A job from timer callback was processed."); 
      JobQueue.TryAdd(job); // Just enqueues the job for later processing 
     } 

     private void ProcessTimer(string message) 
     { 
      // Main thread is running here. 
      Console.WriteLine($"{message} (Thread: {Thread.CurrentThread.Name})"); 
     } 
    } 



    /// <summary> 
    /// The Job class wraps an object's method call, with or without arguments. This method is called later, during the Job execution. 
    /// </summary> 
    public class Job 
    { 
     public string Name { get; } 
     private object TargetObject; 
     private Delegate TargetMethod; 
     private object[] Arguments; 

     public Job(string name, object obj, Delegate method, params object[] args) 
     { 
      Name = name; 
      TargetObject = obj; 
      TargetMethod = method; 
      Arguments = args; 
     } 

     public void Run() 
     { 
      try 
      { 
       TargetMethod.Method.Invoke(TargetObject, Arguments); 
      } 
      catch(Exception ex) 
      { 
       Debug.WriteLine($"Unexpected error running job '{Name}': {ex}"); 
      } 
     } 

    } 
}