2016-04-22 95 views
3

我試圖設置一個RabbitMQ郵件隊列,以便我可以發送消息來啓動長時間運行的進程,並且還能夠發送消息以取消長時間運行的進程(如果需要)。於是我開始了一個EventingBasicConsumer,並沒有在我的Recieved處理程序是這樣的:BasicAck異步處理郵件

if (startProcess) 
{ 
    // start a long running process 
} 
else if (cancelProcess) 
{ 
    // cancel the currently running process 
} 
channel.BasicAck(ea.DeliveryTag, false); 

這不起作用,因爲EventingBasicConsumer不是多線程的,只能同時處理一個消息。所以它不能處理取消消息,直到它完成了長時間運行的過程(在這一點上,顯然沒有意義)。所以接下來我試過這個:

if (startProcess) 
{ 
    Task.Run(() => { 
     // start a long running process 
    } 
} 
else if (cancelProcess) 
{ 
    // cancel the currently running process 
} 
channel.BasicAck(ea.DeliveryTag, false); 

而且這個工作。我現在可以取消長時間運行的進程......但是,我承認請求立即運行長時間運行的進程,而不是在完成之後運行。這意味着如果長時間運行的進程崩潰,則消息已被刪除。所以這需要原始的發送者保持跟蹤並且讓接收者不得不發送消息來說完成了,這一切都變得有些複雜。

所以,我想也許我可以改變EventingBasicConsumer只是總是在一個新線程上激發它的Received事件。所以,我創建了這樣的事情:

public class AsyncRabbitConsumer : DefaultBasicConsumer 
{ 
    // code all the same as EventingBasicConsumer except this bit: 
    public override void HandleBasicDeliver(string consumerTag, 
     ulong deliveryTag, 
     bool redelivered, 
     string exchange, 
     string routingKey, 
     IBasicProperties properties, 
     byte[] body) 
    { 
     base.HandleBasicDeliver(consumerTag, 
      deliveryTag, 
      redelivered, 
      exchange, 
      routingKey, 
      properties, 
      body); 
     if (Received != null) 
     { 
      var args = new BasicDeliverEventArgs(consumerTag, 
        deliveryTag, 
        redelivered, 
        exchange, 
        routingKey, 
        properties, 
        body); 

      Task.Run(() => 
      { 
       Received(this, args); 
      }); 
     } 
    } 
} 

現在,在我的代碼第一個片段,我可以把它處理取消的消息,而長期運行的進程仍在運行長時間運行過程中不會ACK和刪除它的消息,直到它實際完成(或取消)。所以這應該是偉大的......除非我取消我得到這個:

型「RabbitMQ.Client.Exceptions.AlreadyClosedException」的異常出現在RabbitMQ.Client.dll但在用戶代碼

沒有處理

附加信息:已關閉:AMQP操作已中斷:由Peer發起的AMQP關閉原因,代碼= 406,text =「PRECONDITION_FAILED-未知交付標籤3」,classId = 60,methodId = 80,原因=

channel.BasicAck似乎是開始長時間運行的線程的步驟處理。那麼這裏發生了什麼?我認爲確認(首先取消消息,然後是長時間運行的過程消息)正在越過這裏。有沒有什麼體面的方法來解決這個問題?還是我吠叫錯了樹?

可能值得注意的是,取消長時間運行的過程並不是即時的。它會在下一個方便的地方取消,所以幾乎可以確定取消信息將在長時間運行過程結束之前完成處理。

+0

@Rob:正是因爲我有上面的例外。 –

+0

對不起 - 我不好,我在那部分上看了一遍。 – Rob

+0

你如何管理連接?它看起來像你的渠道被處置,這是什麼導致你的失敗。另外 - 有多少人在那裏?如果您有多個人,可能您的第二個工作人員正在取消取消,並試圖取消一個不存在的任務。您可以讓您的工作人員監視特定頻道上的取消,或指定路由鍵 – Rob

回答

0

你能做的就是像消費者一樣 - 第一個是長時間運行的過程,第二個是殺死長時間運行過程的代理。首先會收到消息,在處理完成後處理消息和ACK,如果檢測到消除信號,也會執行ACK。代理商顯然會收到取消消息並殺死第一個消息,併產生第一個消息的另一個實例。顯然這需要流程(消費者)在RMQ之外進行通信。

想到的另一件事(但我從來沒有嘗試過這樣的事情)是,您在消費者中將預取計數設置爲2,並且在「處理單個數據消息」時,將第二條消息發佈到代理(轉發),除非它是CANCEL消息,在這種情況下,在中止處理之後,您同時確認它們兩個 - CANCEL和DATA(以便像這樣調用)消息。

另一種選擇可能是在「長時間運行的過程」中,您有兩個消費者線程,每個線程都使用自己的通道。