2016-08-14 31 views
2

相當新的RabbitMQ,我試圖看看我能否實現我所需要的。RabbitMQ - 單個併發工作者每個路由密鑰

我正在尋找工作隊列模式,但有一個警告。我想只有一個工作者同時運行每個路由鍵。

澄清一個例子:

如果我的順序發送具有路由鍵下面的消息:aabc,我想只有3個工人同時運行。當收到第一個a消息時,工作人員將其拿起並處理。

當收到下一個a消息並且之前的a消息仍然處理(未確認)時,新的a消息應在隊列中等待。當接收到bc消息時,他們每個都會得到一個處理它們的工作人員。當第一個a消息被確認時,任何工人都可以拿起下一個a消息。

另一個澄清:

會以自然的方式使用RabbitMQ的(不站在我這一邊寫任何程序代碼來處理的鎖定和東西...)

編輯模式是可能的。所有員工都可以並且應該處理所有消息,並且我不希望爲每個員工設置一個隊列,因爲我想分擔他們之間的負載,而發佈者不知道哪個工作人員應該處理消息。但我確實希望確保沒有2名工人正在同時處理共享相同密鑰的消息。

例如,如果我有一個Publisher發佈帶有userId字段的郵件,我想確保沒有2個工作人員同時處理具有相同userId的郵件。

編輯2

擴大上userId例子。假設我有一個發佈者和3個工人。發佈者發佈如下消息:{ userId: 1, text: 'Hello' },其變化範圍爲userId s。我的3名工人都對這條消息做了同樣的事情,所以我可以讓他們中的任何一位處理消息。但是我試圖實現的是隻有一個工作人員處理來自某個用戶的消息時間。如果工作人員收到一條消息,並且正在處理該消息,並且收到另一條消息userId 1,我想確保沒有其他工作人員收到該消息。但其他消息與不同的userId s應該由其他可用的工作人員處理。

userId s是事先不知道的,發佈者不知道有多少員工是他們的具體情況,他只是想要安排消息進行處理。

回答

1

你的問題不能用路由密鑰來實現,而是通過一些設置構建到隊列中。

如果您爲a消息定義了「queue_a」,爲b消息定義了「queue_b」,那麼您可以讓儘可能多的消費者連接到它。

RabbitMQ只會將給定的消息傳遞給給定隊列的單個消費者。

它與單個隊列上的多個使用者一起工作的方式是基本的循環調度消息。也就是說,第一條消息將被傳遞給其中一個消費者,並且下一條消息(假設第一消費者仍然很忙)將被傳遞給下一個消費者。

因此,這應該滿足將消息傳遞給隊列中的任何給定消費者的需要。

爲了確保您的郵件能夠獲得任何消費者的同等機會(並且並非全部都是全部交付給同一個消費者),還需要執行其他一些設置。

首先,請務必將消息使用者no ack設置爲false(有時稱爲「自動確認」)。這會強制您從ack來自代碼的消息。

最後定下了居民消費的「消費預取」上限爲1

有了這種設置組合,單次消費將檢索一條消息,並開始做這個工作。當該消費者正在工作時,隊列中等待的任何消息將被傳遞給其他消費者(如果有的話)。如果沒有可用的消息,則消息將在隊列中等待,直到消費者可用。

有了這個,你應該能夠在給定的隊列上實現你想要的行爲。

...

請記住,這隻適用於隊列,但。路由密鑰不能通過這種方式進行管理。來自交換機的所有匹配的路由密鑰都會導致將消息的副本發送到目標隊列。

+0

以我的觀點來看,這個計算器是正確的。但是,即使是排隊的人也不會,因爲問題是「任何工人都可以拿起下一個信息」,這是不可能的。在這種情況下,每個隊列只有一個工作人員和一個路由密鑰。所以每次同一個工作人員拿起下一條消息。 – slowjack2k

+0

謝謝,Derick,但@ slowjack2k對我的意思是正確的。我給工人的所有信息都是一樣的,我不在乎(也不想關心)哪個工作人員收到信息。我只想確保只有一個工作人員根據某個鍵(我爲每條消息生成的)輸入一些消息。 –

+0

@NetanelG這兩個陳述並不一致'我的所有工作人員的消息都是相同的'和'基於某個鍵的消息(我爲每個消息生成的)'Aslo Derick的答案對於你問過的問題是正確的你問過它,但基於這個評論,我認爲你應該編輯它。 – cantSleepNow

相關問題