2017-05-22 38 views
0

我正在使用RabbitMQ來管理多個執行持久任務的服務器。每個服務器可以偵聽一個或多個隊列,但每個服務器一次只能處理一個任務。RabbitMQ:一次有多個隊列/一個(長)任務

每當我在服務器中啓動使用者時,我都會使用channel.basic_qos(prefetch_count=1)對其進行配置,以便只爲相應隊列處理一個任務。我們有: - 2個隊列:task1,task2。 - 2個服務器:server1,server2。 - 兩臺服務器均與task1和task2一起工作

如果下一個消息在同一時間生產: - messageA爲tasks1 - messageB的任務2 - messageC爲tasks1

我期待什麼: - messageA得到由server1的 處理 - messageB被處理由server2 。 messageC保持排隊,直到其中一個服務器準備就緒(完成當前任務)。

我居然得到: - messageA得到由worker1 處理 - messageB得到由worker2 處理 - messageC得到由worker2(錯)

我沒有在同一時間開始的消費者處理。事實上,每個服務器中的工作任務都會不斷打開/關閉。大多數時間服務器使用不同的隊列(server1:tasks1,tasks2,task3; server2:tasks1,tasks5; server3:tasks2,tasks5;等等)。

我怎麼能做到這一點?

編輯 基於Olivier的回答: 任務是不同的。每個服務器都能夠處理一些任務,而不是全部。服務器一次只能處理一個任務。

我試過使用routing_keys進行交換,但是我發現了兩個問題:綁定到路由關鍵字task_i的所有服務器都會處理它的任務(我只需要處理它一次),並且如果沒有服務器綁定到task_i,那麼它的消息被刪除(我需要保持排隊,直到某個服務器可以處理它)。

回答

0

從您提供的描述看來,問題是由於您的服務器同時連接到多個隊列造成的。

由於預取計數設置爲1,連接到3個隊列的服務器將消耗多達3條消息,即使他每次只處理一個消息(根據您的處理描述)。

這不是從你的問題不清楚是否有需要多隊列或者您是否可以將所有的任務在一個單一的隊列中結束的:

  • 做所有的服務器消耗的所有任務
  • 做您需要能夠停止某些任務的處理

如果您需要/希望能夠「停止」某些任務的處理,或控制整個服務器中的處理分佈,則需要管理你的服務器中的消費者一次只有一個活動消費者(否則由於預取1,您將阻止/消費一些消息)。

如果您不需要控制各種任務的處理,將所有消息最終放在單個隊列中會更簡單,並且單個使用者將該隊列設置爲每個服務器預取一個。

+0

感謝您的幫助Olivier。正如你所說,作爲能夠連接到多個隊列的服務器,問題是它實際上一次可以處理多個任務。我發現使用單個隊列的問題是每個任務都不相同。 task1任務與task2完全不同,只有能夠處理task1的服務器才能處理它(並且只有一個)。我嘗試使用與routing_keys的交換,但綁定到路由關鍵task1的每個服務器都將處理該任務(我只需要處理它一次)。我不需要能夠停止處理。 – marcocamejo

+0

如果您只需要一臺服務器來處理給定的隊列/任務,您可能有興趣將您的客戶定義爲獨佔? – Olivier

+0

我只需要一臺服務器來處理每個任務,但會有很多服務器能夠處理它。我的意思是:任務1的messageA應該只能由一個服務器處理,但可能有3或4個服務器在監聽任務1(可以選擇其中的任何一個來處理它)。 – marcocamejo

相關問題