2012-12-11 63 views
4

enter image description here我有一個生產者/消費者隊列如下,但我得到ArgumentWExceptionC#中生產者 - 消費者隊列中的ArgumentException

以下是代碼:

public class ProducerConsumer<T> where T : class 
    { 
     #region Private Variables 
     private Thread _workerThread; 
     private readonly Queue<T> _workQueue; 
     private object _enqueueItemLocker = new object(); 
     private object _processRecordLocker = new object(); 
     private readonly Action<T> _workCallbackAction; 
     private AutoResetEvent _workerWaitSignal; 
     #endregion 

     #region Constructor 
     public ProducerConsumer(Action<T> action) 
     { 
      _workQueue = new Queue<T>(); 
      _workCallbackAction = action; 

     } 
     #endregion 
     #region Private Methods 
     private void ProcessRecord() 
     { 
      while (true) 
      {    
       T workItemToBeProcessed = default(T); 
       bool hasSomeWorkItem = false; 
       lock (_processRecordLocker) 
       { 
        hasSomeWorkItem = _workQueue.Count > 0; 

        if (hasSomeWorkItem) 
        { 
         workItemToBeProcessed = _workQueue.Dequeue(); 
         if (workItemToBeProcessed == null) 
         { 
          return; 
         } 
        } 
       } 
       if (hasSomeWorkItem) 
       { 
        if (_workCallbackAction != null) 
        { 
         _workCallbackAction(workItemToBeProcessed); 
        } 
       } 
       else 
       { 
        _workerWaitSignal.WaitOne(); 
       } 
      } 
     } 
     #endregion 

     #region Public Methods 
     /// <summary> 
     /// Enqueues work item in the queue. 
     /// </summary> 
     /// <param name="workItem">The work item.</param> 
     public void EnQueueWorkItem(T workItem) 
     { 
      lock (_enqueueItemLocker) 
      {    
       _workQueue.Enqueue(workItem); 

       if (_workerWaitSignal == null) 
       { 
        _workerWaitSignal = new AutoResetEvent(false); 
       } 

       _workerWaitSignal.Set(); 
      } 
     } 
     /// <summary> 
     /// Stops the processer, releases wait handles. 
     /// </summary> 
     /// <param name="stopSignal">The stop signal.</param> 
     public void StopProcesser(AutoResetEvent stopSignal) 
     { 
      EnQueueWorkItem(null); 

      _workerThread.Join(); 
      _workerWaitSignal.Close(); 
      _workerWaitSignal = null; 

      if (stopSignal != null) 
      { 
       stopSignal.Set(); 
      } 
     } 
     /// <summary> 
     /// Starts the processer, starts a new worker thread. 
     /// </summary> 
     public void StartProcesser() 
     { 
      if (_workerWaitSignal == null) 
      { 
       _workerWaitSignal = new AutoResetEvent(false); 
      } 
      _workerThread = new Thread(ProcessRecord) { IsBackground = true }; 
      _workerThread.Start(); 
     } 
     #endregion 
    } 

另一類是:

public class Tester 
{ 
    private readonly ProducerConsumer<byte[]> _proConsumer; 
    public Tester() 
    { 
     _proConsumer = new ProducerConsumer<byte[]>(Display); 
    } 
    public void AddData(byte[] data) 
    { 
     try 
     { 
      _proConsumer.EnQueueWorkItem(recordData); 
     } 
     catch (NullReferenceException nre) 
     { 

     } 
    } 
    public void Start() 
    { 
     _proConsumer.StartProcesser(); 
    } 

    private static object _recordLocker = new object(); 

    private void Display(byte[] recordByteStream) 
    { 
     try 
     { 
      lock (_recordLocker) 
      { 
       Console.WriteLine("Done with data:" + BitConverter.ToInt32(recordByteStream, 0)); 

      } 

     } 
     catch (Exception ex) 
     { 

     } 

    } 
} 

而我的主要功能:

class Program 
    { 
     private static Tester _recorder; 
     static void Main(string[] args) 
     { 
      _recorder = new Tester(); 
      _recorder.StartRecording(); 

      for (int i = 0; i < 100000; i++) 
      { 
       _recorder.AddRecordData(BitConverter.GetBytes(i));    
      } 

      Console.Read(); 
     } 
    } 

任何想法,爲什麼我得到的異常又該我確實要避免這種情況?

+0

你在哪一行中得到異常? –

+1

順便說一句 - 你可能想看看'BlockingCollection ' - 它很好地處理這種類型的場景... –

+0

@ReedCopsey我使用的是緊湊框架3.5 –

回答

8

在當前實現中,您的類不是線程安全的。您正在爲您的Enqueuelock (_enqueueItemLocker))和Dequeuelock (_processRecordLocker))調用使用兩個不同的對象,這會在您的Queue<T>中創建競爭條件。

你需要鎖定同一個對象實例兩個電話,以安全地使用隊列。

如果您使用的是.NET 4,我建議使用ConcurrentQueue<T>BlockingCollection<T>來代替,因爲它們可以消除代碼中對鎖的需求,因爲它們是線程安全的。

+0

我同意,違反線程安全的所有投注都關閉,這可能是競賽條件的副作用。 –

+0

@ScottChamberlain'隊列'在內部使用一個數組 - 看起來這只是一個從入隊/出隊模擬被調用的競爭條件等等。 –