2011-11-25 30 views
0

我試圖讓我的頭一輪的RabbitMQ。我想要一個隊列來監聽,當它收到一條消息時,我希望它通過多條消息回覆一個通過reply_to標題指定的匿名隊列。的RabbitMQ和EventMachine的::推遲

到目前爲止,我有以下任務,既對消費者和對reply_to消息的用戶:

desc "start_consumer", "start the test consumer" 
def start_consumer 
    puts "Running #{AMQP::VERSION} version of the gem" 

     AMQP.start(:host => "localhost", :user => "guest", :password => "guest", :vhost => "/", 
         :logging => true, :port => 5672) do |connection| 

     channel = AMQP::Channel.new(connection) 

     requests_queue = channel.queue("one", :exclusive => true, :auto_delete => true) 

     Signal.trap("INT") do 
      connection.close do 
      EM.stop{exit} 
      end 
     end 

     channel.prefetch(1) 

     requests_queue.subscribe(:ack => true) do |header, body| 
      puts "received in server #{body.inspect}" 

      (0..5).each do |n| 
      header.ack 

      reply = {:reply => "Respone #{n}", :is_last => (n == 5)} 

      AMQP::Exchange.default.publish(
              MultiJson.encode(reply), 
              :routing_key => header.reply_to, 
              :correlation_id => header.correlation_id 
             ) 


      sleep(2) 
      end 
     end 

     puts " [x] Awaiting RPC requests" 
     end   
    end 

我調用的代碼是:

def publish(urlSearch, routing_key) 
    EM.run do 

     corr_id = rand(10_000_000).to_s 

     requests ||= Hash.new 

     connection = AMQP.connect(:host => "localhost") 

     callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => true)    

     callback_queue.subscribe do |header, body| 

     reply = MultiJson.decode(body)    

     if reply[:is_last.to_s] 
      connection.close do 
      EM.stop{exit} 
      end 
     end 
     end       

     callback_queue.append_callback(:declare) do 
     AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id) 
     end 

    end 

的問題是,消息不是直到迭代中的所有消息都已發佈。

也就是說在(0..5)循環迭代完成後,那麼所有的消息被公佈。

有人告訴我,使用EventMachine::defer可能是一種選擇,但我不知道如何將其應用到循環。

任何人都可以提出如何解決這個問題的指針?是EventMachine::defer是一個不錯的選擇,如果是這樣,當我使用AMQP.start時該怎麼辦?

回答

0

的是要記住,當與EM的工作是,反應堆本身是單線程的。所以,等到循環完成之前,等待的原因是循環佔用了單個線程。 (你真的,真的,真的不想在EM應用程序中使用的睡眠。你擋住了整個反應器和什麼都不會發生。)

如果你想在消息公佈,或其他任何真正的其他EM要發生的動作(定時器被觸發,收到其他數據等),你必須讓反應堆循環。

您可以使用類似的EM:迭代安排工作對你來說,是這樣的:

EM::Iterator.new(0..5).each do |n, iter| 
    reply = {:reply => "Response #{n}", :is_last => (n == 5)} 
    AMQP::Exchange.default.publish(MultiJson.encode(reply), 
           :routing_key => header.reply_to, 
           :correlation_id => header.correlation_id) 
    iter.next 
end 
header.ack 
+0

最後我用EM.defer https://gist.github.com/1480745。我正在使用JRuby,也許我應該考慮使用Java線程。 – dagda1