我正在使用RabbitMQ來管理多個執行持久任務的服務器。每個服務器可以偵聽一個或多個隊列,但每個服務器一次只能處理一個任務。RabbitMQ:一次有多個隊列/一個(長)任務
每當我在服務器中啓動使用者時,我都會使用channel.basic_qos(prefetch_count=1)
對其進行配置,以便只爲相應隊列處理一個任務。我們有: - 2個隊列:task1,task2。 - 2個服務器:server1,server2。 - 兩臺服務器均與task1和task2一起工作。
如果下一個消息在同一時間生產: - messageA爲tasks1 - messageB的任務2 - messageC爲tasks1
我期待什麼: - messageA得到由server1的 處理 - messageB被處理由server2 。 messageC保持排隊,直到其中一個服務器準備就緒(完成當前任務)。
我居然得到: - messageA得到由worker1 處理 - messageB得到由worker2 處理 - messageC得到由worker2(錯)
我沒有在同一時間開始的消費者處理。事實上,每個服務器中的工作任務都會不斷打開/關閉。大多數時間服務器使用不同的隊列(server1:tasks1,tasks2,task3; server2:tasks1,tasks5; server3:tasks2,tasks5;等等)。
我怎麼能做到這一點?
編輯 基於Olivier的回答: 任務是不同的。每個服務器都能夠處理一些任務,而不是全部。服務器一次只能處理一個任務。
我試過使用routing_keys進行交換,但是我發現了兩個問題:綁定到路由關鍵字task_i的所有服務器都會處理它的任務(我只需要處理它一次),並且如果沒有服務器綁定到task_i,那麼它的消息被刪除(我需要保持排隊,直到某個服務器可以處理它)。
感謝您的幫助Olivier。正如你所說,作爲能夠連接到多個隊列的服務器,問題是它實際上一次可以處理多個任務。我發現使用單個隊列的問題是每個任務都不相同。 task1任務與task2完全不同,只有能夠處理task1的服務器才能處理它(並且只有一個)。我嘗試使用與routing_keys的交換,但綁定到路由關鍵task1的每個服務器都將處理該任務(我只需要處理它一次)。我不需要能夠停止處理。 – marcocamejo
如果您只需要一臺服務器來處理給定的隊列/任務,您可能有興趣將您的客戶定義爲獨佔? – Olivier
我只需要一臺服務器來處理每個任務,但會有很多服務器能夠處理它。我的意思是:任務1的messageA應該只能由一個服務器處理,但可能有3或4個服務器在監聽任務1(可以選擇其中的任何一個來處理它)。 – marcocamejo