如何使用AMQP gem在Ruby中擴展RabbitMQ使用者?水平和垂直縮放RabbitMQ消費者?
我閱讀了文檔,想出了一些(似乎)在一個簡單的例子中工作的東西 。示例水平縮放。新進程將 連接到代理並接收消息的子集。從那裏,每個進程可以啓動多個消費者線程。它使用文檔中描述的低級用戶界面 。
下面的代碼:
require 'amqp'
workers = ARGV[1] || 4
puts "Running #{workers} workers"
AMQP.start do |amqp|
channel = AMQP::Channel.new
channel.on_error do |conn, ex|
raise ex.reply_text
end
exchange = channel.fanout 'scaling.test', durable: true, prefetch: 1
queue = channel.queue("worker_queue", auto_delete: true).bind(exchange)
workers.times do |i|
consumer = AMQP::Consumer.new channel, queue, "consumer-#{i}", exclusive = false, manual_ack = false
consumer.consume.on_delivery do |meta, payload|
meta.ack
puts "Consumer #{consumer.consumer_tag} in #{Process.pid} got #{payload}"
end
end
trap('SIGTERM') do
amqp.start { EM.stop }
end
end
有幾件事情我不能確定的:
- 是否交換式回事?該文檔指出隊列之間的消息的直接交換負載平衡 。我使用直接和扇出交換來測試這個例子,它的功能相同。所以如果我想支持垂直和水平縮放,交換類型是否重要?
:prefetch
選項應該是什麼?我認爲一個人會是最好的。- 負載平衡如何專門工作?該文檔指出負載平衡發生在消費者之間而不在隊列之間。但是當我運行兩個 進程時,我可以看到進程1打印出來:「1,2,3,4」,然後處理兩個打印輸出 「5,6,7,8」。我認爲他們會失靈,或者渠道本身就是消費者? 根據輸出而不是文檔,這是有意義的。
- 從EventMachine的角度看,這看起來是否正確?我是否需要執行某種 線程池以使同一進程中的多個使用者正常工作?
Ivan,謝謝你的回覆! 追蹤您的觀點: 1.這就是我認爲的(關於直接交換),但該示例使用扇出或直接「正確」工作。 2.在這種情況下,我對'1' /'n'感到困惑嗎?工人!=消費者? – ahawkins
嗯,我懷疑fanout可能沒有像Ruby AMQP gem中設計的那樣工作 - 我的一個同事報告了同樣的行爲,但是我沒有得到調查的機會。 – Ivan
回覆:2.這更像是一個架構示例,因爲每個消費者都會花費代理(儘管最小),您可以使用1個消費者獲取所有作業,然後將其分配給您的流程中的工作線程。類似賽璐珞的東西在這裏可以很好地工作,但是因爲您已經在使用EventMachine來完成這項工作。在這種情況下,您希望消費者獲取與工作線程相同數量的作業,以便他們始終處於繁忙狀態。 – Ivan