我有一個生產者/消費者隊列如下,但我得到ArgumentWException
。C#中生產者 - 消費者隊列中的ArgumentException
以下是代碼:
public class ProducerConsumer<T> where T : class
{
#region Private Variables
private Thread _workerThread;
private readonly Queue<T> _workQueue;
private object _enqueueItemLocker = new object();
private object _processRecordLocker = new object();
private readonly Action<T> _workCallbackAction;
private AutoResetEvent _workerWaitSignal;
#endregion
#region Constructor
public ProducerConsumer(Action<T> action)
{
_workQueue = new Queue<T>();
_workCallbackAction = action;
}
#endregion
#region Private Methods
private void ProcessRecord()
{
while (true)
{
T workItemToBeProcessed = default(T);
bool hasSomeWorkItem = false;
lock (_processRecordLocker)
{
hasSomeWorkItem = _workQueue.Count > 0;
if (hasSomeWorkItem)
{
workItemToBeProcessed = _workQueue.Dequeue();
if (workItemToBeProcessed == null)
{
return;
}
}
}
if (hasSomeWorkItem)
{
if (_workCallbackAction != null)
{
_workCallbackAction(workItemToBeProcessed);
}
}
else
{
_workerWaitSignal.WaitOne();
}
}
}
#endregion
#region Public Methods
/// <summary>
/// Enqueues work item in the queue.
/// </summary>
/// <param name="workItem">The work item.</param>
public void EnQueueWorkItem(T workItem)
{
lock (_enqueueItemLocker)
{
_workQueue.Enqueue(workItem);
if (_workerWaitSignal == null)
{
_workerWaitSignal = new AutoResetEvent(false);
}
_workerWaitSignal.Set();
}
}
/// <summary>
/// Stops the processer, releases wait handles.
/// </summary>
/// <param name="stopSignal">The stop signal.</param>
public void StopProcesser(AutoResetEvent stopSignal)
{
EnQueueWorkItem(null);
_workerThread.Join();
_workerWaitSignal.Close();
_workerWaitSignal = null;
if (stopSignal != null)
{
stopSignal.Set();
}
}
/// <summary>
/// Starts the processer, starts a new worker thread.
/// </summary>
public void StartProcesser()
{
if (_workerWaitSignal == null)
{
_workerWaitSignal = new AutoResetEvent(false);
}
_workerThread = new Thread(ProcessRecord) { IsBackground = true };
_workerThread.Start();
}
#endregion
}
另一類是:
public class Tester
{
private readonly ProducerConsumer<byte[]> _proConsumer;
public Tester()
{
_proConsumer = new ProducerConsumer<byte[]>(Display);
}
public void AddData(byte[] data)
{
try
{
_proConsumer.EnQueueWorkItem(recordData);
}
catch (NullReferenceException nre)
{
}
}
public void Start()
{
_proConsumer.StartProcesser();
}
private static object _recordLocker = new object();
private void Display(byte[] recordByteStream)
{
try
{
lock (_recordLocker)
{
Console.WriteLine("Done with data:" + BitConverter.ToInt32(recordByteStream, 0));
}
}
catch (Exception ex)
{
}
}
}
而我的主要功能:
class Program
{
private static Tester _recorder;
static void Main(string[] args)
{
_recorder = new Tester();
_recorder.StartRecording();
for (int i = 0; i < 100000; i++)
{
_recorder.AddRecordData(BitConverter.GetBytes(i));
}
Console.Read();
}
}
任何想法,爲什麼我得到的異常又該我確實要避免這種情況?
你在哪一行中得到異常? –
順便說一句 - 你可能想看看'BlockingCollection' - 它很好地處理這種類型的場景... –
@ReedCopsey我使用的是緊湊框架3.5 –