2013-12-10 45 views
0

如何使用AMQP gem在Ruby中擴展RabbitMQ使用者?水平和垂直縮放RabbitMQ消費者?

我閱讀了文檔,想出了一些(似乎)在一個簡單的例子中工作的東西 。示例水平縮放。新進程將 連接到代理並接收消息的子集。從那裏,每個進程可以啓動多個消費者線程。它使用文檔中描述的低級用戶界面 。

下面的代碼:

require 'amqp' 

workers = ARGV[1] || 4 

puts "Running #{workers} workers" 

AMQP.start do |amqp| 
    channel = AMQP::Channel.new 

    channel.on_error do |conn, ex| 
    raise ex.reply_text 
    end 

    exchange = channel.fanout 'scaling.test', durable: true, prefetch: 1 
    queue = channel.queue("worker_queue", auto_delete: true).bind(exchange) 

    workers.times do |i| 
    consumer = AMQP::Consumer.new channel, queue, "consumer-#{i}", exclusive = false, manual_ack = false 

    consumer.consume.on_delivery do |meta, payload| 
     meta.ack 
     puts "Consumer #{consumer.consumer_tag} in #{Process.pid} got #{payload}" 
    end 
    end 

    trap('SIGTERM') do 
    amqp.start { EM.stop } 
    end 
end 

有幾件事情我不能確定的:

  1. 是否交換式回事?該文檔指出隊列之間的消息的直接交換負載平衡 。我使用直接和扇出交換來測試這個例子,它的功能相同。所以如果我想支持垂直和水平縮放,交換類型是否重要?
  2. :prefetch選項應該是什麼?我認爲一個人會是最好的。
  3. 負載平衡如何專門工作?該文檔指出負載平衡發生在消費者之間而不在隊列之間。但是當我運行兩個 進程時,我可以看到進程1打印出來:「1,2,3,4」,然後處理兩個打印輸出 「5,6,7,8」。我認爲他們會失靈,或者渠道本身就是消費者? 根據輸出而不是文檔,這是有意義的。
  4. 從EventMachine的角度看,這看起來是否正確?我是否需要執行某種 線程池以使同一進程中的多個使用者正常工作?

回答

1

這其中大部分實際上是蓋在documentation像RabbitMQ的經紀人,但在回答你的問題:

  1. 對你最有可能想直接交流的工作隊列,也就是說,一個這會將消息(作業)準確地發送給一名工作人員,而不是一次向多名工作人員發送消息。但是這可能會根據你的工作而改變。根據定義,扇出應該將相同的消息路由給多個消費者。

  2. 此次設置的預取應爲1。一般來說,這要求代理人填寫消費者的網絡緩衝區,並填寫1消息直到確認。另一種設置是,您有1位消費者和n工作人員,在這種情況下,您可以將預取設置爲n。另外值得注意的是,在這樣的設置中,你不應該直到完成工作之後纔會確定。

  3. 負載均衡基本上是消費者之間的循環。這就是爲什麼你順序看到一切。如果每一項工作都需要不同的時間,你會看到分配的變化。

我希望有幫助。我有一段時間沒有對Ruby AMQP庫做任何事情 - 我們重寫了我們所有的workers in Go

+0

Ivan,謝謝你的回覆! 追蹤您的觀點: 1.這就是我認爲的(關於直接交換),但該示例使用扇出或直接「正確」工作。 2.在這種情況下,我對'1' /'n'感到困惑嗎?工人!=消費者? – ahawkins

+0

嗯,我懷疑fanout可能沒有像Ruby AMQP gem中設計的那樣工作 - 我的一個同事報告了同樣的行爲,但是我沒有得到調查的機會。 – Ivan

+0

回覆:2.這更像是一個架構示例,因爲每個消費者都會花費代理(儘管最小),您可以使用1個消費者獲取所有作業,然後將其分配給您的流程中的工作線程。類似賽璐珞的東西在這裏可以很好地工作,但是因爲您已經在使用EventMachine來完成這項工作。在這種情況下,您希望消費者獲取與工作線程相同數量的作業,以便他們始終處於繁忙狀態。 – Ivan