2011-10-23 44 views
23

我想知道是否存在ConcurrentQueue的實現/包裝,類似於BlockingCollection,其中從集合中取出不會阻塞,而是取而代之的是異步,並會導致異步等待,直到項目被放置在隊列。awaitable基於任務的隊列

我已經想出了我自己的實現,但它似乎沒有按預期執行。我想知道如果我正在重塑已經存在的東西。

這裏是我的實現:

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 

    object queueSyncLock = new object(); 

    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> Dequeue() 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs=null; 
     T firstItem=default(T); 
     while (true) 
     { 
      bool ok; 
      lock (queueSyncLock) 
      { 
       ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); 
       if (ok) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        queue.TryDequeue(out firstItem); 
       } 
      } 
      if (!ok) break; 
      tcs.SetResult(firstItem); 
     } 
    } 
} 
+0

哦呸.... ...... –

+21

@AdamSack:確實,但你的評論並沒有幫助我。 – spender

回答

36

我不知道自由的手段的無鎖解決方案,但您可以查看新的Dataflow library,這是Async CTP的一部分。一個簡單的BufferBlock<T>應該足夠了,例如:

BufferBlock<int> buffer = new BufferBlock<int>(); 

生產和消費是最容易通過的數據流塊類型的擴展方法來完成。

生產是簡單的:

buffer.Post(13); 

和消費是異步就緒:

int item = await buffer.ReceiveAsync(); 

我建議,如果可能,您使用的數據流;使這樣一個緩衝區既有效又正確比第一次出現更困難。

+0

這看起來非常有希望...明天會看看。謝謝。它看起來非常像CCR端口。 – spender

+2

而不是在睡前偷看!看起來Dataflow非常適合我的需求。它似乎彌合了TPL提供的和CCR提供的(我曾經取得巨大成功的)之間的差距。這讓我感到樂觀的是,CCR的出色工作並沒有被浪費掉。這是正確的答案(還有一些閃亮的新東西讓我的牙齒進入!)謝謝@StephenCleary。 – spender

1

這可能是矯枉過正爲您的使用情況下(給出的學習曲線),但Reactive Extentions提供了所有你所能想異步組成的粘合劑。

您基本上訂閱了更改,它們會在您可用時推送給您,並且您可以讓系統在單獨的線程上推送更改。

+0

我至少對部分內容非常熟悉,但在生產中使用它有點難以理解,因爲其他人可能不得不維護代碼。我真的在挖掘異步/等待所帶來的簡單性,並將其引入到一個以前非常複雜的服務器產品中,我試圖將所有的異步技術都保存在一項技術之下。 – spender

-1

你可以只使用一個BlockingCollection(使用默認ConcurrentQueue),敷在調用TakeTask這樣你就可以await它:

var bc = new BlockingCollection<T>(); 

T element = await Task.Run(() => bc.Take()); 
+4

不錯的主意,但我不滿意阻止。我將有幾千個客戶,每個客戶都有自己的消息隊列。任何阻擋都會使船舶沉沒,因爲它會使線程無所事事。我想要一個可等待的,非阻塞任務的原因是,我可以在線程池中保留所有操作而不會導致線程池餓死。 – spender

0

這裏是我目前使用的實施。

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 
    object queueSyncLock = new object(); 
    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> DequeueAsync(CancellationToken ct) 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     ct.Register(() => 
     { 
      lock (queueSyncLock) 
      { 
       tcs.TrySetCanceled(); 
      } 
     }); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs = null; 
     T firstItem = default(T); 
     lock (queueSyncLock) 
     { 
      while (true) 
      { 
       if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem)) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        if (tcs.Task.IsCanceled) 
        { 
         continue; 
        } 
        queue.TryDequeue(out firstItem); 
       } 
       else 
       { 
        break; 
       } 
       tcs.SetResult(firstItem); 
      } 
     } 
    } 
} 

它的工作原理不夠好,但有相當多的爭論對queueSyncLock,因爲我做了很多使用CancellationToken的取消一些等待任務。當然,這導致相當少堵我將與BlockingCollection但看到...

我想知道如果有一個更流暢,鎖定殊途同歸,最終的

2

我atempt(它有創建一個「承諾」時引發的事件,它可以通過一個外部生產者可以用來知道什麼時候才能產生更多的項目):

public class AsyncQueue<T> 
{ 
    private ConcurrentQueue<T> _bufferQueue; 
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue; 
    private object _syncRoot = new object(); 

    public AsyncQueue() 
    { 
     _bufferQueue = new ConcurrentQueue<T>(); 
     _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); 
    } 

    /// <summary> 
    /// Enqueues the specified item. 
    /// </summary> 
    /// <param name="item">The item.</param> 
    public void Enqueue(T item) 
    { 
     TaskCompletionSource<T> promise; 
     do 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return;          
      } 
     } 
     while (promise != null); 

     lock (_syncRoot) 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return; 
      } 

      _bufferQueue.Enqueue(item); 
     }    
    } 

    /// <summary> 
    /// Dequeues the asynchronous. 
    /// </summary> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    /// <returns></returns> 
    public Task<T> DequeueAsync(CancellationToken cancellationToken) 
    { 
     T item; 

     if (!_bufferQueue.TryDequeue(out item)) 
     { 
      lock (_syncRoot) 
      { 
       if (!_bufferQueue.TryDequeue(out item)) 
       { 
        var promise = new TaskCompletionSource<T>(); 
        cancellationToken.Register(() => promise.TrySetCanceled()); 

        _promisesQueue.Enqueue(promise); 
        this.PromiseAdded.RaiseEvent(this, EventArgs.Empty); 

        return promise.Task; 
       } 
      } 
     } 

     return Task.FromResult(item); 
    } 

    /// <summary> 
    /// Gets a value indicating whether this instance has promises. 
    /// </summary> 
    /// <value> 
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>. 
    /// </value> 
    public bool HasPromises 
    { 
     get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; } 
    } 

    /// <summary> 
    /// Occurs when a new promise 
    /// is generated by the queue 
    /// </summary> 
    public event EventHandler PromiseAdded; 
} 
+0

我認爲這是最好的解決方案。我已經實現了這一點並進行了廣泛的測試。一些注意事項:對!promise.Task.IsCanceled的調用是不必要的。我添加了一個ManualResetEventSlim來跟蹤bufferQueue何時爲空,以便調用者可以阻止等待隊列清空。 –

+0

你[應該處置](http://stackoverflow.com/a/21653382/298609)'CancellationTokenRegistration'你從'cancellationToken.Register'調用中獲得。 – Paya