2011-03-09 75 views
5

我創建了一個類,其目的是抽象出對隊列的併發訪問控制。長時間運行任務被取消後如何正確清理

該類被設計爲在單個線程上實例化,由多個線程寫入,然後從後續單線程中讀取。

我有一個單獨的長時間運行的任務生成內部類將執行阻塞循環,並觸發事件,如果一個項目成功出列。

我的問題是:我的執行取消長時間運行的任務,並隨後清理/重置CancellationTokenSource對象的正確用法?

理想情況下,我想要一個活動對象能夠被停止並重新啓動,同時保持可用性添加到隊列中。

我用彼得·布朗伯格的文章爲基礎:Producer/Consumer Queue and BlockingCollection in C# 4.0

下面的代碼:

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Test 
{ 
    public delegate void DeliverNextQueuedItemHandler<T>(T item); 

public sealed class SOQueueManagerT<T> 
{ 

    ConcurrentQueue<T> _multiQueue; 
    BlockingCollection<T> _queue; 
    CancellationTokenSource _canceller; 
    Task _listener = null; 

    public event DeliverNextQueuedItemHandler<T> OnNextItem; 

    public bool IsRunning { get; private set; } 
    public int QueueSize 
    { 
     get 
     { 
      if (_queue != null) 
       return _queue.Count; 
      return -1; 
     } 
    } 

    public CancellationTokenSource CancellationTokenSource 
    { 
     get 
     { 
      if (_canceller == null) 
       _canceller = new CancellationTokenSource(); 

      return _canceller; 
     } 
    } 

    public SOQueueManagerT() 
    { 
     _multiQueue = new ConcurrentQueue<T>(); 
     _queue = new BlockingCollection<T>(_multiQueue); 

     IsRunning = false; 
    } 

    public void Start() 
    { 
     if (_listener == null) 
     { 


      IsRunning = true; 

      _listener = Task.Factory.StartNew(() => 
      { 

       while (!CancellationTokenSource.Token.IsCancellationRequested) 
       { 
        T item; 
        if (_queue.TryTake(out item, 100)) 
        { 
         if (OnNextItem != null) 
         { 

          OnNextItem(item); 
         } 
        } 

       } 
      }, 
      CancellationTokenSource.Token, 
      TaskCreationOptions.LongRunning, 
      TaskScheduler.Default); 
     } 
    } 

    public void Stop() 
    { 
     if (_listener != null) 
     { 
      CancellationTokenSource.Cancel(); 
      CleanUp(); 
     } 
    } 

    public void Add(T item) 
    { 
     _queue.Add(item); 
    } 

    private void CleanUp() 
    { 
     _listener.Wait(2000); 
     if (_listener.IsCompleted) 
     { 
      IsRunning = false; 
      _listener = null; 
      _canceller = null; 
     } 
    } 


} 
} 

UPDATE 這就是我到底有沒有用了。這並不完美,但迄今爲止正在完成這項工作。

public sealed class TaskQueueManager<T> 
{ 
    ConcurrentQueue<T> _multiQueue; 
    BlockingCollection<T> _queue; 
    CancellationTokenSource _canceller; 
    Task _listener = null; 

    public event DeliverNextQueuedItemHandler<T> OnNextItem; 

    public bool IsRunning 
    { 
     get 
     { 
      if (_listener == null) 
       return false; 
      else if (_listener.Status == TaskStatus.Running || 
       _listener.Status == TaskStatus.Created || 
       _listener.Status == TaskStatus.WaitingForActivation || 
       _listener.Status == TaskStatus.WaitingToRun || 
       _listener.IsCanceled) 
       return true; 
      else 
       return false; 
     } 
    } 
    public int QueueSize 
    { 
     get 
     { 
      if (_queue != null) 
       return _queue.Count; 
      return -1; 
     } 
    } 

    public TaskQueueManager() 
    { 
     _multiQueue = new ConcurrentQueue<T>(); 
     _queue = new BlockingCollection<T>(_multiQueue); 
    } 

    public void Start() 
    { 
     if (_listener == null) 
     { 
      _canceller = new CancellationTokenSource(); 

      _listener = Task.Factory.StartNew(() => 
      { 
       while (!_canceller.Token.IsCancellationRequested) 
       { 
        T item; 
        if (_queue.TryTake(out item, 100)) 
        { 
         if (OnNextItem != null) 
         { 
          try 
          { 
           OnNextItem(item); 
          } 
          catch (Exception e) 
          { 
           //log or call an event 
          } 
         } 
        } 
       } 
      }, 
      _canceller.Token, 
      TaskCreationOptions.LongRunning, 
      TaskScheduler.Default); 
     } 
    } 

    public void Stop() 
    { 
     if (_listener != null) 
     { 
      _canceller.Cancel(); 

      if (_listener.IsCanceled && !_listener.IsCompleted) 
       _listener.Wait(); 

      _listener = null; 
      _canceller = null; 
     } 
    } 

    public void Add(T item) 
    { 
     if (item != null) 
     { 
      _queue.Add(item); 
     } 
     else 
     { 
      throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null"); 
     } 
    } 
} 

回答

1

小心的編程是唯一會削減它。即使您取消了操作,您也可能會有一個未完成的操作,但這個操作在很短的時間內尚未完成。這很可能是一個阻塞操作,已經陷入僵局。在這種情況下,你的程序不會實際終止。例如,如果我多次調用CleanUp方法或者沒有先調用Start,我會感覺它會崩潰。在清理期間2秒超時,感覺比計劃更爲隨意,而且我會盡可能確保正常關閉或崩潰/掛起(你永遠不想讓併發的東西處於未知狀態)。

此外,IsRunning是明確設置,而不是從對象的狀態推斷。

爲了獲得靈感,我想讓你看看我最近編寫的一個類似的類,它是一個生產者/消費者模式,它在後臺線程中工作。您可以在CodePlex上找到該源代碼。雖然,這是專門解決一個非常具體的問題。

這裏,取消是通過設置一個特定的類型來解決的,只有工作者線程可以識別並開始關閉。這也確保我從不取消待處理工作,只考慮整個工作單元。

爲了改善這種情況,您可以爲當前工作單獨設置一個計時器,並在取消時終止或回滾未完成的工作。現在,執行交易類似的行爲將需要一些試驗和錯誤,因爲您需要查看每個可能的角落案例並問問自己,如果程序在此崩潰會發生什麼情況?理想情況下,所有這些代碼路徑都會導致可恢復或已知狀態,從而可以恢復工作。但是,我認爲你已經猜到了,這將需要仔細的編程和大量的測試。

+0

John:是的,我還發現多次調用Stop()可能會導致問題。我修改了Stop()方法,以便在返回之前等待任務完成。是的,這使得它成爲一個阻塞的呼叫,在這個階段它沒問題。我可能會提供對Stop方法的覆蓋,以提供轉發到Wait(超時)調用的超時。 您所做的IsRunning點是有效的。 – MattC 2011-03-10 10:22:53

+0

@MattC你看代碼示例了嗎? – 2011-03-10 13:09:41

+0

是的,我做了,這很有趣,我看到你做了什麼。我正在處理的具體問題是正確處理內部消費者任務的啓動,停止和重新啓動。在消費者停止時,隊列被添加到隊列中感到非常開心。 我已經完全刪除了CleanUp方法,並更加具體地基於任務狀態的IsRunning屬性。 我會離開這個開放只是爲了添加一個專門關於我使用TPL的答案。如果沒有,我會把你的。 – MattC 2011-03-10 13:32:25