2012-08-14 60 views
16

我正在使用BlockingCollection來實現生產者/消費者模式。我有一個異步循環,用要處理的數據填充集合,然後在稍後時間由客戶端訪問。數據包到達稀疏,我希望輪詢在不使用阻塞呼叫的情況下完成。異步取消阻止收集

實質上,我正在尋找類似BeginTakeEndTake的東西,它不存在於阻塞集合中,以便我可以在回調中使用內部線程池。無論如何,它不一定是BlockingCollection。任何我需要的東西都會很棒。

這就是我現在得到的。 _bufferedPacketsBlockingCollection<byte[]>

public byte[] Read(int timeout) 
{ 
    byte[] result; 
    if (_bufferedPackets.IsCompleted) 
    { 
     throw new Exception("Out of packets"); 
    } 
    _bufferedPackets.TryTake(out result, timeout);  
    return result; 
} 

我想這是這樣的,在僞代碼:

public void Read(int timeout) 
{ 
    _bufferedPackets.BeginTake(result => 
     { 
      var bytes = _bufferedPackets.EndTake(result); 
      // Process the bytes, or the resuting timeout 
     }, timeout, _bufferedPackets); 
} 

什麼是我對這個選擇?我不希望任何線程處於等待狀態,因爲有很多其他IO東西要處理,並且我會很快耗盡線程。

更新:我已經重寫有問題的代碼,以不同的方式使用異步過程,實質上交換基礎上的回調,如果有超時限制內等待請求。這工作正常,但如果有一種方法可以做到這一點,而不依賴於定時器並交換周圍可能導致競爭條件且很難寫入(並理解)的lambda的方法,它仍然會很棒。我已經用自己的異步隊列實現解決了這個問題,但如果有更標準和經過充分測試的選項,它仍然非常棒。

+0

目前,我認爲沒有任何TPL集合提供除了ObservableCollection之外的異步方法。你怎麼看 ? – 2012-08-14 10:12:21

+0

你可以把它包裝在一個任務中,任務= Task.Factory.StartNew (()=> {//你的代碼返回字節[]});'但是這不是智慧,並且必須有更好的方法。 – MoonKnight 2012-08-14 10:16:32

+0

包裝在任務中將消耗一個將被鎖定在等待句柄中的任務。由於有很多任務正在進行,這將永遠佔用一項任務,這將使我不幸地在游泳池中耗盡任務。 – Dervall 2012-08-14 11:01:37

回答

0

所以看起來並不是一個內置的選擇,我出去努力做我想做的實驗。事實證明,爲了使這項工作與舊的異步模式的其他用戶大致相同,需要做大量的工作。

public class AsyncQueue<T> 
{ 
    private readonly ConcurrentQueue<T> queue; 
    private readonly ConcurrentQueue<DequeueAsyncResult> dequeueQueue; 

    private class DequeueAsyncResult : IAsyncResult 
    { 
     public bool IsCompleted { get; set; } 
     public WaitHandle AsyncWaitHandle { get; set; } 
     public object AsyncState { get; set; } 
     public bool CompletedSynchronously { get; set; } 
     public T Result { get; set; } 

     public AsyncCallback Callback { get; set; } 
    } 

    public AsyncQueue() 
    { 
     dequeueQueue = new ConcurrentQueue<DequeueAsyncResult>(); 
     queue = new ConcurrentQueue<T>(); 
    } 

    public void Enqueue(T item) 
    { 
     DequeueAsyncResult asyncResult; 
     while (dequeueQueue.TryDequeue(out asyncResult)) 
     { 
      if (!asyncResult.IsCompleted) 
      { 
       asyncResult.IsCompleted = true; 
       asyncResult.Result = item; 

       ThreadPool.QueueUserWorkItem(state => 
       { 
        if (asyncResult.Callback != null) 
        { 
         asyncResult.Callback(asyncResult); 
        } 
        else 
        { 
         ((EventWaitHandle) asyncResult.AsyncWaitHandle).Set(); 
        } 
       }); 
       return; 
      } 
     } 
     queue.Enqueue(item); 
    } 

    public IAsyncResult BeginDequeue(int timeout, AsyncCallback callback, object state) 
    { 
     T result; 
     if (queue.TryDequeue(out result)) 
     { 
      var dequeueAsyncResult = new DequeueAsyncResult 
      { 
       IsCompleted = true, 
       AsyncWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset), 
       AsyncState = state, 
       CompletedSynchronously = true, 
       Result = result 
      }; 
      if (null != callback) 
      { 
       callback(dequeueAsyncResult); 
      } 
      return dequeueAsyncResult; 
     } 

     var pendingResult = new DequeueAsyncResult 
     { 
      AsyncState = state, 
      IsCompleted = false, 
      AsyncWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset), 
      CompletedSynchronously = false, 
      Callback = callback 
     }; 
     dequeueQueue.Enqueue(pendingResult); 
     Timer t = null; 
     t = new Timer(_ => 
     { 
      if (!pendingResult.IsCompleted) 
      { 
       pendingResult.IsCompleted = true; 
       if (null != callback) 
       { 
        callback(pendingResult); 
       } 
       else 
       { 
        ((EventWaitHandle)pendingResult.AsyncWaitHandle).Set(); 
       } 
      } 
      t.Dispose(); 
     }, new object(), timeout, Timeout.Infinite); 

     return pendingResult; 
    } 

    public T EndDequeue(IAsyncResult result) 
    { 
     var dequeueResult = (DequeueAsyncResult) result; 
     return dequeueResult.Result; 
    } 
} 

我不是太肯定的IsComplete屬性的同步,而我不是如何dequeueQueue只拿到後續Enqueue調用清理太熱。我不確定什麼時候是正確的時間來等待手柄,但這是迄今爲止我所得到的最佳解決方案。

請不要以任何方式考慮此生產質量代碼。我只是想展示一下我如何保持所有線程不用等待鎖定而旋轉的一般要點。我確信這充滿了各種各樣的邊緣情況和錯誤,但它滿足了要求,我想回饋給那些遇到問題的人。

+0

我不完全理解你的線程模型。無論回調是否已完成或超時,您都可以在EndDequeue結果中進行訪問。如果你遍歷了IsCompleted屬性,直到你得到一個答案,你仍然阻塞一個線程。 – 2012-08-29 22:40:30

+0

它不會因爲它在一個定時器上運行而被阻塞,該定時器不會啓動線程,而只是在該池上排隊任務。 Enddequeue也會在超時時被調用,結果將在這些情況下被默認。 – Dervall 2012-08-30 05:11:43

+0

我的意思是誰叫EndDequeue?開始/結束模式確保您可以產生工作,但在某些時候需要調用相應的End方法。你目前所做的就是在Enqueue方法中註冊回調並調用它們,並在此期間通過定時器稍微等待一些數據的到來。你可以在沒有定時器的情況下通過一個觀察者隊列來完成它,在這個隊列中Enqueue存儲它們的入隊時間。當數據到達時,您將所有過時的觀察者出列並跳過它們並調用尚未到期的回調。 – 2012-08-30 06:32:45

0

我可能會誤解你的情況,但是你不能使用非阻塞集合嗎?

我創造了這個例子來說明:

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace AsyncTakeFromBlockingCollection 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var queue = new ConcurrentQueue<string>(); 

      var producer1 = Task.Factory.StartNew(() => 
      { 
       for (int i = 0; i < 10; i += 1) 
       { 
        queue.Enqueue("======="); 
        Thread.Sleep(10); 
       } 
      }); 

      var producer2 = Task.Factory.StartNew(() => 
      { 
       for (int i = 0; i < 10; i += 1) 
       { 
        queue.Enqueue("*******"); 
        Thread.Sleep(3); 
       } 
      }); 

      CreateConsumerTask("One ", 3, queue); 
      CreateConsumerTask("Two ", 4, queue); 
      CreateConsumerTask("Three", 7, queue); 

      producer1.Wait(); 
      producer2.Wait(); 
      Console.WriteLine(" Producers Finished"); 
      Console.ReadLine(); 
     } 

     static void CreateConsumerTask(string taskName, int sleepTime, ConcurrentQueue<string> queue) 
     { 
      Task.Factory.StartNew(() => 
      { 
       while (true) 
       { 
        string result; 
        if (queue.TryDequeue(out result)) 
        { 
         Console.WriteLine(" {0} consumed {1}", taskName, result); 
        } 
        Thread.Sleep(sleepTime); 
       } 
      }); 
     } 
    } 
} 

這裏是程序的輸出

enter image description here

我相信BlockingCollection旨在包裹併發收集和提供一種機制,允許多個消費者阻止;等待生產者。這種用法似乎與您的要求相悖。

我發現這article about the BlockingCollection class是有幫助的。

+0

不幸的是我不能這麼做。 IO完成時會有很多單獨的隊列被稀疏地填滿。可能有成千上萬種。這些僅在發生另一個IO事件時消耗,該事件在IO完成回調中運行,因此無法阻止。實質上,很少有生產者和消費者,都在IO完成上運行。消費者需要知道該集合是否在給定的超時內添加了項目,而不將自己置於阻塞調用中。 – Dervall 2012-08-24 06:22:43

+0

消費者如何映射到隊列?每個隊列有一名消費者?消費者是否遍歷隊列尋找產品? – rtev 2012-08-24 12:10:15

+0

消費者異步到達IO回調,並且需要將事情從集合中取出,或者等待一段時間,然後放棄在此期間集合沒有充滿數據的情況。沒有迭代,它基本上是一個隊列 – Dervall 2012-08-24 12:14:27

0

我很確定BlockingCollection<T>不能做到這一點,你不得不推出自己的。我想出了這個:

class NotifyingCollection<T> 
{ 
    private ConcurrentQueue<Action<T>> _subscribers = new ConcurrentQueue<Action<T>>(); 
    private ConcurrentQueue<T> _overflow = new ConcurrentQueue<T>(); 

    private object _lock = new object(); 

    public void Add(T item) 
    { 
     _overflow.Enqueue(item); 
     Dispatch(); 
    } 

    private void Dispatch() 
    { 
     // this lock is needed since we need to atomically dequeue from both queues... 
     lock (_lock) 
     { 
      while (_overflow.Count > 0 && _subscribers.Count > 0) 
      { 
       Action<T> callback; 
       T item; 

       var r1 = _overflow.TryDequeue(out item); 
       var r2 = _subscribers.TryDequeue(out callback); 

       Debug.Assert(r1 && r2); 
       callback(item); 
       // or, optionally so that the caller thread's doesn't take too long ... 
       Task.Factory.StartNew(() => callback(item)); 
       // but you'll have to consider how exceptions will be handled. 
      } 
     } 
    } 

    public void TakeAsync(Action<T> callback) 
    { 
     _subscribers.Enqueue(callback); 
     Dispatch(); 
    } 
} 

我用它調用TakeAsync()Add()充當回調線程的線程。當您撥打Add()TakeAsync()時,它會嘗試將所有排隊的項目分配到排隊的回叫。這樣就沒有創建線程,只是坐在那裏睡覺,等待被髮信號。

該鎖定有點醜陋,但您可以在不鎖定的情況下排隊並在多個線程上訂閱。我找不到一種方法來執行相當於的操作,只有在其他隊列上沒有使用該鎖的情況下才可以退出。

注意:我只用最少的幾個線程測試了它。

+0

嗯,是的。唯一的問題是,你將使用你的方法創建一個鎖定的線程,考慮到非常高的負載,這將是危險的。 TryTake呼叫塊。在您有成千上萬個這樣的調用的情況下,您將耗盡任務線程池中的線程並鎖定您的應用程序。 – Dervall 2012-08-29 19:32:59

+0

啊!你完全不需要阻塞線程;我以爲你只是想確保調用線程不會被阻塞。我現在明白了。 – atanamir 2012-08-29 20:29:04

+0

好的,修改我的答案,我認爲會做你正在尋找的... – atanamir 2012-08-29 21:38:21