我有一個場景,我需要非常快速地分發和處理作業。我將在隊列中快速填充大約45個作業,並且可以同時處理大約20個作業(5臺機器,每個4個核心)。每個工作都需要花費不同的時間,並且使垃圾收集變得複雜是一個問題,所以我需要能夠讓消費者脫機進行垃圾收集。訂閱隊列,收到1封郵件,然後取消訂閱
目前,我有一切工作與流行(每消費者每5ms彈出)。這似乎是不可取的,因爲它轉換爲rabbitmq每秒600個流行請求。
我會喜歡,如果它有一個彈出命令,就像訂閱,但只有一個消息。 (這個過程會阻塞,等待來自rabbitMQ連接的輸入,通過類似於Kernel.select的東西)
我試圖欺騙AMQP gem來做這樣的事情,但它不工作:我無法取消訂閱,直到隊列爲空,並且沒有更多消息正被髮送給消費者。我擔心的其他取消訂閱方法將會丟失信息。
consume_1.rb:
require "amqp"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
queue.unsubscribe { puts "unbound" }
sleep 3
end
end
consumer_many.rb:
require "amqp"
# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
end
end
producer.rb:
require "amqp"
i = 0
EventMachine.run do
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
EM.add_periodic_timer(1) do
msg = "Message #{i}"
i+=1
puts "~ publishing #{msg}"
end
end
我會推出consume_many.rb和producer.rb。消息將按預期流動。
當我啓動consume_1.rb時,它會獲取所有其他消息(如預期的那樣)。但它從來沒有取消訂閱,因爲它從來沒有完成處理所有的消息......所以它就這樣了。
如何獲取consume_1.rb訂閱隊列,獲取單個消息,然後將自己從負載平衡器環中取出,以便它可以完成工作,而不會丟失任何可能存在的額外待處理作業隊列,否則將被安排發送到過程?
添
編寫一個表現這種方式的服務器在ruby中會很愚蠢......也許使用zero-mq來實現我自己的代理是解決方案嗎? – 2011-04-15 23:22:47