我試圖設置一個RabbitMQ郵件隊列,以便我可以發送消息來啓動長時間運行的進程,並且還能夠發送消息以取消長時間運行的進程(如果需要)。於是我開始了一個EventingBasicConsumer
,並沒有在我的Recieved
處理程序是這樣的:BasicAck異步處理郵件
if (startProcess)
{
// start a long running process
}
else if (cancelProcess)
{
// cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);
這不起作用,因爲EventingBasicConsumer
不是多線程的,只能同時處理一個消息。所以它不能處理取消消息,直到它完成了長時間運行的過程(在這一點上,顯然沒有意義)。所以接下來我試過這個:
if (startProcess)
{
Task.Run(() => {
// start a long running process
}
}
else if (cancelProcess)
{
// cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);
而且這個工作。我現在可以取消長時間運行的進程......但是,我承認請求立即運行長時間運行的進程,而不是在完成之後運行。這意味着如果長時間運行的進程崩潰,則消息已被刪除。所以這需要原始的發送者保持跟蹤並且讓接收者不得不發送消息來說完成了,這一切都變得有些複雜。
所以,我想也許我可以改變EventingBasicConsumer
只是總是在一個新線程上激發它的Received
事件。所以,我創建了這樣的事情:
public class AsyncRabbitConsumer : DefaultBasicConsumer
{
// code all the same as EventingBasicConsumer except this bit:
public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
{
base.HandleBasicDeliver(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body);
if (Received != null)
{
var args = new BasicDeliverEventArgs(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body);
Task.Run(() =>
{
Received(this, args);
});
}
}
}
現在,在我的代碼第一個片段,我可以把它處理取消的消息,而長期運行的進程仍在運行和長時間運行過程中不會ACK和刪除它的消息,直到它實際完成(或取消)。所以這應該是偉大的......除非我取消我得到這個:
型「RabbitMQ.Client.Exceptions.AlreadyClosedException」的異常出現在RabbitMQ.Client.dll但在用戶代碼
沒有處理附加信息:已關閉:AMQP操作已中斷:由Peer發起的AMQP關閉原因,代碼= 406,text =「PRECONDITION_FAILED-未知交付標籤3」,classId = 60,methodId = 80,原因=
從channel.BasicAck
似乎是開始長時間運行的線程的步驟處理。那麼這裏發生了什麼?我認爲確認(首先取消消息,然後是長時間運行的過程消息)正在越過這裏。有沒有什麼體面的方法來解決這個問題?還是我吠叫錯了樹?
可能值得注意的是,取消長時間運行的過程並不是即時的。它會在下一個方便的地方取消,所以幾乎可以確定取消信息將在長時間運行過程結束之前完成處理。
@Rob:正是因爲我有上面的例外。 –
對不起 - 我不好,我在那部分上看了一遍。 – Rob
你如何管理連接?它看起來像你的渠道被處置,這是什麼導致你的失敗。另外 - 有多少人在那裏?如果您有多個人,可能您的第二個工作人員正在取消取消,並試圖取消一個不存在的任務。您可以讓您的工作人員監視特定頻道上的取消,或指定路由鍵 – Rob