9

我一直在努力一些異步等待的東西。我正在使用RabbitMQ在一些程序之間發送/接收消息。防止任務在某些線程上運行

作爲一種背景,RabbitMQ客戶端使用3個左右的線程,我可以看到:連接線程和兩個心跳線程。每當通過TCP接收到消息時,連接線程就會處理它並調用我通過接口提供的回調。該文件表示,最好避免在這次調用中做大量工作,因爲它在連接的同一線程上完成,而且事情需要繼續。他們提供了一個QueueingBasicConsumer,它有一個阻塞'出隊'方法,用於等待收到消息。

我希望我的消費者能夠在這段等待時間內真正釋放他們的線程上下文,以便其他人可以做一些工作,所以我決定使用異步/等待任務。我寫了一個AwaitableBasicConsumer類以下列方式使用TaskCompletionSource S:

我有一個awaitable出列方法:

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken) 
{ 
    //we are enqueueing a TCS. This is a "read" 
    rwLock.EnterReadLock(); 

    try 
    { 
     TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>(); 

     //if we are cancelled before we finish, this will cause the tcs to become cancelled 
     cancellationToken.Register(() => 
     { 
      tcs.TrySetCanceled(); 
     }); 

     //if there is something in the undelivered queue, the task will be immediately completed 
     //otherwise, we queue the task into deliveryTCS 
     if (!TryDeliverUndelivered(tcs)) 
      deliveryTCS.Enqueue(tcs); 
     } 

     return tcs.Task; 
    } 
    finally 
    { 
     rwLock.ExitReadLock(); 
    } 
} 

其中的RabbitMQ客戶端調用回調函數滿足任務:這就是所謂的從AMQP連接線程的上下文

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body) 
{ 
    //we want nothing added while we remove. We also block until everybody is done. 
    rwLock.EnterWriteLock(); 
    try 
    { 
     RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); 

     bool sent = false; 
     TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs; 
     while (deliveryTCS.TryDequeue(out tcs)) 
     { 
      //once we manage to actually set somebody's result, we are done with handling this 
      if (tcs.TrySetResult(e)) 
      { 
       sent = true; 
       break; 
      } 
     } 

     //if nothing was sent, we queue up what we got so that somebody can get it later. 
     /** 
     * Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the 
     * next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are 
     * doing our thing here. 
     */ 
     if (!sent) 
     { 
      undelivered.Enqueue(e); 
     } 
    } 
    finally 
    { 
     rwLock.ExitWriteLock(); 
    } 
} 

rwLockReaderWriterLockSlim。兩個隊列(deliveryTCSundelivered)是ConcurrentQueues。

問題:

每過一段時間,等待着出列方法拋出異常的方法。這通常不會成爲問題,因爲該方法也是async,因此它進入任務輸入的「異常」完成狀態。問題出現在調用DequeueAsync的任務在RabbitMQ客戶端創建的AMQP連接線程上等待後恢復的情況。通常我已經看到任務恢復到主線程或浮動的工作線程之一。但是,當它恢復到AMQP線程並拋出異常時,一切都會停滯。任務不會進入其「異常狀態」並且AMQP連接線程留下說它正在執行發生異常的方法。

我這裏主要的困惑是,爲什麼這不起作用:

var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards 

ConsumerTaskState state = new ConsumerTaskState() 
{ 
    Connection = connection, 
    CancellationToken = cancellationToken 
}; 

//if there is a problem, we execute our faulted method 
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called 
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted); 

這裏是RunAsync方法,建立了試驗:

public async Task RunAsync() 
{ 
    using (var channel = this.Connection.CreateModel()) 
    { 
     ... 
     AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel); 
     var result = consumer.DequeueAsync(this.CancellationToken); 

     //wait until we find something to eat 
     await result; 

     throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls 
     ... 
    } //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state 
} 

讀什麼我寫的,我看我可能沒有很好地解釋我的問題。如果需要澄清,請詢問。是

我的解決方案,我想出如下:

  • 刪除所有異步/等待代碼,只需使用直線上升線程和塊。性能會下降,但至少它不會失速
  • 某種程度上豁免了AMQP線程被用於恢復任務。我假設他們正在睡覺或什麼,然後默認TaskScheduler決定使用它們。如果我能找到一種方法來告訴任務調度程序,那些線程是不受限制的,那就太好了。

有沒有人有解釋爲什麼會發生這種情況或解決此問題的任何建議?現在我正在刪除異步代碼,以便程序可靠,但我真的很想了解這裏發生了什麼。

+0

*我希望我的消費者能夠在這段等待時間內真正釋放他們的線程上下文*。你是說等待線程阻止其他線程執行嗎?那看起來 。 。 。奇。 –

+0

RabbitMQ客戶端提供了一個「QueuingBasicConsumer」,它阻止了調用者,直到它收到一些東西。如果我要使用原始線程而不是任務,那個線程將會睡覺而不做任何事情。我想避免這種阻塞,以便底層線程可以在那段時間內完成另一個任務(增加吞吐量),所以我編寫了我的awaitable消費者,它使用任務而不是實際阻塞調用線程。 –

+1

我明白那個部分。我不明白的是你試圖解決的問題。如果'QueuingBasicConsumer'寫得很好,這是一個非繁忙的等待,被阻塞的線程在等待時不會佔用CPU週期,因此不會影響正在完成的其他任何工作。只需將這些線程專用於RabbitMQ並將其他線程用於其他作業。 –

回答

5

我首先建議您閱讀我的async intro,該文詳細解釋了await如何捕獲上下文並使用它來恢復執行。總之,它將捕獲當前SynchronizationContext(或當前TaskScheduler,如果SynchronizationContext.Currentnull)。

其他重要的細節是async延續計劃與TaskContinuationOptions.ExecuteSynchronously(@svick在評論中指出)。我有一個blog post about this但AFAIK它沒有正式記錄在任何地方。這個細節確實使得編寫一個async生產者/消費者隊列困難。

原因await不是「切換回原來的上下文」(可能),因爲RabbitMQ的線程SynchronizationContextTaskScheduler沒有做的 - 因此,繼續直接當你調用TrySetResult因爲這些線程執行一下就像普通的線程池線程一樣。

順便說一句,閱讀你的代碼,我懷疑你使用讀寫器鎖和併發隊列是不正確的。我不能確定沒有看到整個代碼,但這是我的印象。

我強烈建議你使用現有的async隊列,並圍繞它建立一個消費者(換句話說,讓別人去做難題:)。在TPL Dataflow中的BufferBlock<T>類型可以充當async隊列;如果您的平臺上有Dataflow,那將是我的第一個建議。否則,我有一個AsyncProducerConsumerQueue type in my AsyncEx library,或者你可以write your own(正如我在我的博客中所描述的)。

下面是一個使用BufferBlock<T>一個例子:

private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>(); 

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body) 
{ 
    RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); 
    _queue.Post(e); 
} 

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken) 
{ 
    return _queue.ReceiveAsync(cancellationToken); 
} 

在這個例子中,我讓你DequeueAsync API。但是,一旦您開始使用TPL Dataflow,請考慮在其他地方使用它。當你需要這樣一個隊列時,通常會發現代碼的其他部分也會受益於數據流方法。例如,您可以將BufferBlock鏈接到ActionBlock,而不是使用一堆方法調用DequeueAsync

+0

我確實使用了ReaderWriterLockSlim,這種方式可能並非打算。我曾經這樣說過,多人可以使用DequeueAsync,但是在處理事件時,除了處理程序之外,沒有人應該觸摸任何隊列。直到我添加'undelivered'隊列纔沒有必要,因此我不會錯過在調用DequeueAsync之前收到的消息。我用錯了嗎?這裏是完整的AwaitableBasicConsumer代碼:https://gist.github.com/kcuzner/7242836 –

+0

好吧,它看起來會起作用。但它並沒有讓你得到一個簡單的'鎖'。我看到另一個問題:'cancellationToken.Register'可以內聯執行它的委託,這會導致'SetResult'失敗。還有另一個我不願意留給斯蒂芬·圖布的模糊角落條件。 :) –

+0

還有一個後續問題:RabbitMQ以標準方式創建它的線程,只有一個'new Thread(new ThreadStart(...')。這些線程是否有資格在它們上運行任務,或者它們是否會在如果我可以知道這些任務不會在這些線程上運行(除非同步調用,就像我一樣),那麼我可以使用'BufferBlock '(我之前不知道),而不是不必擔心在同一個應用程序中使用TPL和RabbitMQ –

相關問題