2011-10-14 58 views
4

我有一個簡單生產者/消費者AMQP設置是這樣的:是否有用於提交/回滾消息處理的amqp體系結構的設計模式?

producer -> e1:jobs_queue -> consumer -> e2:results_queue -> result_handler 

生產者一定數量的就業崗位的發送。消費者一次一個地取下工作,並對其進行處理,並將結果推送到另一個隊列中。然後由result_handler將結果發佈到數據庫。

有時消費者失敗 - 它可能被操作系統殺死或拋出異常。如果在處理消息時發生這種情況,則此消息丟失,不會產生相應的結果,我很難過。如果失敗的工作重新排隊,我會很高興。

什麼我要找的是確保一個設計模式,要麼消費者處理該作業完成,並提出相應的結果爲* results_queue *,或者如果它失敗作業放回* jobs_queue * 。由於消費者是什麼故障,消費者不應負責管理任何與其自身監督有關的消息。

我們知道,消費者未能處理作業,如果:

  • 從* job_queue *找了一份工作,有些超時
  • 後沒有結果已經生產出來了工作,從* job_queue *然後死亡

對於我的應用程序,我們可能通過簡單地等待處理作業超時來捕獲第二個案例。在生產中,將有許多工人來監督,所有工作都從共同的工作清單中提取出來,並將結果放入一個結果交換/隊列中。

+0

你是否自動確認郵件?大概你可以關閉自動確認,然後在成功處理作業後確認,從而讓兔子處理重新發送的消息。 –

+0

可靠消息傳遞,持久消息隊列,確認傳遞,兩階段確認(接收和保存/轉發)以及其他幾種模式有幾種模式。對於您的平臺和需求的詳細瞭解,這不是一個簡單的解決方案。 –

回答

2

實現您想要的最簡單的方法是手動處理收到的消息的確認。在node-amqp這很簡單,只需將選項{ ack: true }添加到queue.subscribe調用。然後你可以通過調用隊列上的某個函數來確認消息。在node-amqp的情況下,它是queue.shift()

您還可以使用prefetchCount來設置消費者被允許的尚未確認消息的數量。

如果消費者斷開連接,任何未確認的消息現在都會重新發送給任何連接的消費者。

通過將隊列設置爲durableautoDelete: false,還可以確保在重新啓動MQ服務器或斷開最後一位使用者時,隊列(及其上的消息)不會被刪除。

相關問題