2012-01-06 50 views
0

我正在嘗試設置RabbitMQ rpc。我想要一個隊列來監聽,並且當它收到一條消息時,我希望它回覆一個匿名隊列,該隊列通過具有多條消息的reply_to頭部指定。EventMachine EM ::迭代器正在被rabbitmq RPC阻塞

我有以下托爾任務創建一個隊列,然後使用EM:迭代發送數量的消息返回到與replyt_to路由密鑰中指定的隊列:

desc "start_consumer", "start the test consumer" 
def start_consumer 
    conf = { 
    :host => "localhost", 
    :user => "guest", 
    :password => "guest", 
    :vhost => "/", 
    :logging => true, 
    :port => 5672 
    } 

    # n = 1 

    AMQP.start(conf) do |connection| 

    channel = AMQP::Channel.new(connection) 

    requests_queue = channel.queue("one") 
    requests_queue.purge 

    Signal.trap("INT") do 
     connection.close do 
     EM.stop{exit} 
     end 
    end 

    channel.prefetch(1) 

    requests_queue.subscribe(:ack => true) do |header, body| 
     url_search = MultiJson.decode(body) 

     EM::Iterator.new(0..5).each do |n, iter| 
     lead = get_lead(n, (n == 5)) 

     puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}" 

     AMQP::Exchange.default.publish(
             MultiJson.encode(lead), 
             :immediate => true, 
             :routing_key => header.reply_to, 
             :correlation_id => header.correlation_id 
            ) 

     iter.next 
     end 
    end 

    puts " [x] Awaiting RPC requests" 
    end   
end 

代碼beloow將消息發送到上面指定的隊列,並創建一個隊列,用於偵聽由EM :: Iterator代碼發送的消息。此隊列的名稱是第一個隊列reply_to標頭的路由鍵。

def publish(urlSearch, routing_key) 
    EM.run do 
    corr_id = rand(10_000_000).to_s 

    requests ||= Hash.new 

    connection = AMQP.connect(:host => "localhost") 

    callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false) 

    callback_queue.subscribe do |header, body| 
     lead = safe_json_decode(body) 

     puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}"    

     if lead["is_last"] 
     puts "in exit" 
     connection.close do 
      EM.stop{exit} 
     end 
     end 
    end 

    callback_queue.append_callback(:declare) do 
     AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id) 
    end 

    puts "initial message sent" 
    end 
end 

上面的代碼工作正如我想要的一個惱人的例外。有些東西阻止了EM :: Iterator代碼被異步執行。只有在EM :: Iterator代碼完成後纔會發送消息。我希望消息被異步發送,並在每次迭代後由匿名隊列處理。目前,只有在EM :: Iterator代碼完成其最後一次迭代後纔會發送所有消息。

任何人都可以看到我做錯了什麼或建議一種不同的方法嗎?我試過EM :: defer並且具有相同的行爲。

回答

0

紡紗一個新線程的回答我的問題:

Thread.new do 
    5.times do 
    lead = get_lead(n, (n == 5)) 

    puts "message #{n} is_last = #{lead.is_last} at #{Time.now}"; 

    AMQP::Exchange.default.publish(
            MultiJson.encode(lead), 
            :routing_key => header.reply_to, 
            :correlation_id => header.correlation_id 
           ) 

    n += 1 
    sleep(2) 
    end 
end 

創建一個新的線程停止EventMachine的反應器被阻塞和消息發送異步。

+0

EM.defer可能更合適,因爲它不需要產生線程,啓動數據庫連接等 – bbozo 2013-12-06 09:17:05