我試圖讓我的頭一輪的RabbitMQ。我想要一個隊列來監聽,當它收到一條消息時,我希望它通過多條消息回覆一個通過reply_to
標題指定的匿名隊列。的RabbitMQ和EventMachine的::推遲
到目前爲止,我有以下任務,既對消費者和對reply_to
消息的用戶:
desc "start_consumer", "start the test consumer"
def start_consumer
puts "Running #{AMQP::VERSION} version of the gem"
AMQP.start(:host => "localhost", :user => "guest", :password => "guest", :vhost => "/",
:logging => true, :port => 5672) do |connection|
channel = AMQP::Channel.new(connection)
requests_queue = channel.queue("one", :exclusive => true, :auto_delete => true)
Signal.trap("INT") do
connection.close do
EM.stop{exit}
end
end
channel.prefetch(1)
requests_queue.subscribe(:ack => true) do |header, body|
puts "received in server #{body.inspect}"
(0..5).each do |n|
header.ack
reply = {:reply => "Respone #{n}", :is_last => (n == 5)}
AMQP::Exchange.default.publish(
MultiJson.encode(reply),
:routing_key => header.reply_to,
:correlation_id => header.correlation_id
)
sleep(2)
end
end
puts " [x] Awaiting RPC requests"
end
end
我調用的代碼是:
def publish(urlSearch, routing_key)
EM.run do
corr_id = rand(10_000_000).to_s
requests ||= Hash.new
connection = AMQP.connect(:host => "localhost")
callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => true)
callback_queue.subscribe do |header, body|
reply = MultiJson.decode(body)
if reply[:is_last.to_s]
connection.close do
EM.stop{exit}
end
end
end
callback_queue.append_callback(:declare) do
AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
end
end
的問題是,消息不是直到迭代中的所有消息都已發佈。
也就是說在(0..5)
循環迭代完成後,那麼所有的消息被公佈。
有人告訴我,使用EventMachine::defer
可能是一種選擇,但我不知道如何將其應用到循環。
任何人都可以提出如何解決這個問題的指針?是EventMachine::defer
是一個不錯的選擇,如果是這樣,當我使用AMQP.start
時該怎麼辦?
最後我用EM.defer https://gist.github.com/1480745。我正在使用JRuby,也許我應該考慮使用Java線程。 – dagda1