我創建了一個類,其目的是抽象出對隊列的併發訪問控制。長時間運行任務被取消後如何正確清理
該類被設計爲在單個線程上實例化,由多個線程寫入,然後從後續單線程中讀取。
我有一個單獨的長時間運行的任務生成內部類將執行阻塞循環,並觸發事件,如果一個項目成功出列。
我的問題是:我的執行取消長時間運行的任務,並隨後清理/重置CancellationTokenSource
對象的正確用法?
理想情況下,我想要一個活動對象能夠被停止並重新啓動,同時保持可用性添加到隊列中。
我用彼得·布朗伯格的文章爲基礎:Producer/Consumer Queue and BlockingCollection in C# 4.0
下面的代碼:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
public delegate void DeliverNextQueuedItemHandler<T>(T item);
public sealed class SOQueueManagerT<T>
{
ConcurrentQueue<T> _multiQueue;
BlockingCollection<T> _queue;
CancellationTokenSource _canceller;
Task _listener = null;
public event DeliverNextQueuedItemHandler<T> OnNextItem;
public bool IsRunning { get; private set; }
public int QueueSize
{
get
{
if (_queue != null)
return _queue.Count;
return -1;
}
}
public CancellationTokenSource CancellationTokenSource
{
get
{
if (_canceller == null)
_canceller = new CancellationTokenSource();
return _canceller;
}
}
public SOQueueManagerT()
{
_multiQueue = new ConcurrentQueue<T>();
_queue = new BlockingCollection<T>(_multiQueue);
IsRunning = false;
}
public void Start()
{
if (_listener == null)
{
IsRunning = true;
_listener = Task.Factory.StartNew(() =>
{
while (!CancellationTokenSource.Token.IsCancellationRequested)
{
T item;
if (_queue.TryTake(out item, 100))
{
if (OnNextItem != null)
{
OnNextItem(item);
}
}
}
},
CancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
public void Stop()
{
if (_listener != null)
{
CancellationTokenSource.Cancel();
CleanUp();
}
}
public void Add(T item)
{
_queue.Add(item);
}
private void CleanUp()
{
_listener.Wait(2000);
if (_listener.IsCompleted)
{
IsRunning = false;
_listener = null;
_canceller = null;
}
}
}
}
UPDATE 這就是我到底有沒有用了。這並不完美,但迄今爲止正在完成這項工作。
public sealed class TaskQueueManager<T>
{
ConcurrentQueue<T> _multiQueue;
BlockingCollection<T> _queue;
CancellationTokenSource _canceller;
Task _listener = null;
public event DeliverNextQueuedItemHandler<T> OnNextItem;
public bool IsRunning
{
get
{
if (_listener == null)
return false;
else if (_listener.Status == TaskStatus.Running ||
_listener.Status == TaskStatus.Created ||
_listener.Status == TaskStatus.WaitingForActivation ||
_listener.Status == TaskStatus.WaitingToRun ||
_listener.IsCanceled)
return true;
else
return false;
}
}
public int QueueSize
{
get
{
if (_queue != null)
return _queue.Count;
return -1;
}
}
public TaskQueueManager()
{
_multiQueue = new ConcurrentQueue<T>();
_queue = new BlockingCollection<T>(_multiQueue);
}
public void Start()
{
if (_listener == null)
{
_canceller = new CancellationTokenSource();
_listener = Task.Factory.StartNew(() =>
{
while (!_canceller.Token.IsCancellationRequested)
{
T item;
if (_queue.TryTake(out item, 100))
{
if (OnNextItem != null)
{
try
{
OnNextItem(item);
}
catch (Exception e)
{
//log or call an event
}
}
}
}
},
_canceller.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
public void Stop()
{
if (_listener != null)
{
_canceller.Cancel();
if (_listener.IsCanceled && !_listener.IsCompleted)
_listener.Wait();
_listener = null;
_canceller = null;
}
}
public void Add(T item)
{
if (item != null)
{
_queue.Add(item);
}
else
{
throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
}
}
}
John:是的,我還發現多次調用Stop()可能會導致問題。我修改了Stop()方法,以便在返回之前等待任務完成。是的,這使得它成爲一個阻塞的呼叫,在這個階段它沒問題。我可能會提供對Stop方法的覆蓋,以提供轉發到Wait(超時)調用的超時。 您所做的IsRunning點是有效的。 – MattC 2011-03-10 10:22:53
@MattC你看代碼示例了嗎? – 2011-03-10 13:09:41
是的,我做了,這很有趣,我看到你做了什麼。我正在處理的具體問題是正確處理內部消費者任務的啓動,停止和重新啓動。在消費者停止時,隊列被添加到隊列中感到非常開心。 我已經完全刪除了CleanUp方法,並更加具體地基於任務狀態的IsRunning屬性。 我會離開這個開放只是爲了添加一個專門關於我使用TPL的答案。如果沒有,我會把你的。 – MattC 2011-03-10 13:32:25