我一直在努力一些異步等待的東西。我正在使用RabbitMQ在一些程序之間發送/接收消息。防止任務在某些線程上運行
作爲一種背景,RabbitMQ客戶端使用3個左右的線程,我可以看到:連接線程和兩個心跳線程。每當通過TCP接收到消息時,連接線程就會處理它並調用我通過接口提供的回調。該文件表示,最好避免在這次調用中做大量工作,因爲它在連接的同一線程上完成,而且事情需要繼續。他們提供了一個QueueingBasicConsumer
,它有一個阻塞'出隊'方法,用於等待收到消息。
我希望我的消費者能夠在這段等待時間內真正釋放他們的線程上下文,以便其他人可以做一些工作,所以我決定使用異步/等待任務。我寫了一個AwaitableBasicConsumer
類以下列方式使用TaskCompletionSource
S:
我有一個awaitable出列方法:
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
//we are enqueueing a TCS. This is a "read"
rwLock.EnterReadLock();
try
{
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
//if we are cancelled before we finish, this will cause the tcs to become cancelled
cancellationToken.Register(() =>
{
tcs.TrySetCanceled();
});
//if there is something in the undelivered queue, the task will be immediately completed
//otherwise, we queue the task into deliveryTCS
if (!TryDeliverUndelivered(tcs))
deliveryTCS.Enqueue(tcs);
}
return tcs.Task;
}
finally
{
rwLock.ExitReadLock();
}
}
其中的RabbitMQ客戶端調用回調函數滿足任務:這就是所謂的從AMQP連接線程的上下文
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
//we want nothing added while we remove. We also block until everybody is done.
rwLock.EnterWriteLock();
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
bool sent = false;
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
while (deliveryTCS.TryDequeue(out tcs))
{
//once we manage to actually set somebody's result, we are done with handling this
if (tcs.TrySetResult(e))
{
sent = true;
break;
}
}
//if nothing was sent, we queue up what we got so that somebody can get it later.
/**
* Without the rwlock, this logic would cause concurrency problems in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
* next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
* doing our thing here.
*/
if (!sent)
{
undelivered.Enqueue(e);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
rwLock
是ReaderWriterLockSlim
。兩個隊列(deliveryTCS
和undelivered
)是ConcurrentQueues。
問題:
每過一段時間,等待着出列方法拋出異常的方法。這通常不會成爲問題,因爲該方法也是async
,因此它進入任務輸入的「異常」完成狀態。問題出現在調用DequeueAsync
的任務在RabbitMQ客戶端創建的AMQP連接線程上等待後恢復的情況。通常我已經看到任務恢復到主線程或浮動的工作線程之一。但是,當它恢復到AMQP線程並拋出異常時,一切都會停滯。任務不會進入其「異常狀態」並且AMQP連接線程留下說它正在執行發生異常的方法。
我這裏主要的困惑是,爲什麼這不起作用:
var task = c.RunAsync(); //<-- This method awaits the DequeueAsync and throws an exception afterwards
ConsumerTaskState state = new ConsumerTaskState()
{
Connection = connection,
CancellationToken = cancellationToken
};
//if there is a problem, we execute our faulted method
//PROBLEM: If task fails when its resumed onto the AMQP thread, this method is never called
task.ContinueWith(this.OnFaulted, state, TaskContinuationOptions.OnlyOnFaulted);
這裏是RunAsync
方法,建立了試驗:
public async Task RunAsync()
{
using (var channel = this.Connection.CreateModel())
{
...
AwaitableBasicConsumer consumer = new AwaitableBasicConsumer(channel);
var result = consumer.DequeueAsync(this.CancellationToken);
//wait until we find something to eat
await result;
throw new NotImplementeException(); //<-- the test exception. Normally this causes OnFaulted to be called, but sometimes, it stalls
...
} //<-- This is where the debugger says the thread is sitting at when I find it in the stalled state
}
讀什麼我寫的,我看我可能沒有很好地解釋我的問題。如果需要澄清,請詢問。是
我的解決方案,我想出如下:
- 刪除所有異步/等待代碼,只需使用直線上升線程和塊。性能會下降,但至少它不會失速
- 某種程度上豁免了AMQP線程被用於恢復任務。我假設他們正在睡覺或什麼,然後默認
TaskScheduler
決定使用它們。如果我能找到一種方法來告訴任務調度程序,那些線程是不受限制的,那就太好了。
有沒有人有解釋爲什麼會發生這種情況或解決此問題的任何建議?現在我正在刪除異步代碼,以便程序可靠,但我真的很想了解這裏發生了什麼。
*我希望我的消費者能夠在這段等待時間內真正釋放他們的線程上下文*。你是說等待線程阻止其他線程執行嗎?那看起來 。 。 。奇。 –
RabbitMQ客戶端提供了一個「QueuingBasicConsumer」,它阻止了調用者,直到它收到一些東西。如果我要使用原始線程而不是任務,那個線程將會睡覺而不做任何事情。我想避免這種阻塞,以便底層線程可以在那段時間內完成另一個任務(增加吞吐量),所以我編寫了我的awaitable消費者,它使用任務而不是實際阻塞調用線程。 –
我明白那個部分。我不明白的是你試圖解決的問題。如果'QueuingBasicConsumer'寫得很好,這是一個非繁忙的等待,被阻塞的線程在等待時不會佔用CPU週期,因此不會影響正在完成的其他任何工作。只需將這些線程專用於RabbitMQ並將其他線程用於其他作業。 –