2012-11-02 41 views
4

我已經開始玩弄的RabbitMQ在AMQP寶石文檔提供RPC sample code,嘗試寫非常簡單的代碼執行同步遠程調用:RPC使用EventMachine的&RabbitMQ的

require "amqp" 

module RPC 
    class Base 
    include EM::Deferrable 

    def rabbit(rabbit_callback) 
     rabbit_loop = Proc.new { 
     AMQP.connect do |connection| 
      AMQP::Channel.new(connection) do |channel| 
      channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue| 
       self.callback(&rabbit_callback) 
       self.succeed(connection, channel, requests_queue) 
      end # requests_queue 
      end # AMQP.channel 
     end # AMQP.connect 

     Signal.trap("INT") { connection.close { EM.stop } } 
     Signal.trap("TERM") { connection.close { EM.stop } } 
     } 

     if !EM.reactor_running? 
     EM.run do 
      rabbit_loop.call 
     end 
     else 
     rabbit_loop.call 
     end 
    end 
    end 

    class Server < Base 

    def run 
     server_loop = Proc.new do |connection, channel, requests_queue| 
     consumer = AMQP::Consumer.new(channel, requests_queue).consume 
     consumer.on_delivery do |metadata, payload| 
      puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..." 
      channel.default_exchange.publish(Time.now.to_s, 
              :routing_key => metadata.reply_to, 
              :correlation_id => metadata.message_id, 
              :mandatory  => true) 
      metadata.ack 
     end 
     end 
     rabbit(server_loop) 
    end 

    end 

    class Client < Base 

    def sync_push(request) 
     result = nil 
     sync_request = Proc.new do |connection, channel, requests_queue| 
     message_id = Kernel.rand(10101010).to_s 

     response_queue = channel.queue("", :exclusive => true, :auto_delete => true) 
     response_queue.subscribe do |headers, payload| 
      if headers.correlation_id == message_id 
      result = payload 
      connection.close { EM.stop } 
      end 
     end 

     EM.add_timer(0.1) do 
      puts "[request] Sending a request...#{request} with id #{message_id}" 
      channel.default_exchange.publish(request, 
              :routing_key => requests_queue.name, 
              :reply_to => response_queue.name, 
              :message_id => message_id) 
     end 
     end 

     rabbit(sync_request) 
     result 
    end 
    end 
end 

我們的想法很簡單:我希望有一個消息隊列隨時準備好(這是通過rabbit方法處理的)。每當客戶想要發送一個請求時,它首先創建一個響應的臨時隊列以及一個消息ID;它然後將該請求發佈到主消息隊列,並等待臨時隊列中具有相同消息ID的響應,以便知道何時針對該特定請求的答案准備就緒。我猜message_id與臨時隊列有點多餘(因爲隊列也應該是唯一的)。

我現在用的這個客戶端/服務器代碼

# server session 
>> server = RPC::Server.new 
=> #<RPC::Server:0x007faaa23bb5b0> 
>> server.run 
Updating client properties 
[requests] Got a request 3315740. Sending a reply to amq.gen-QCv8nP2dI5Qd6bg2Q1Xhk0... 

# client session 
>> client = RPC::Client.new 
=> #<RPC::Client:0x007ffb6be6aed8> 
>> client.sync_push "test 1" 
Updating client properties 
[request] Sending a request...test 1 with id 3315740 
=> "2012-11-02 21:58:45 +0100" 
>> client.sync_push "test 2" 
AMQ::Client::ConnectionClosedError: Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007ffb6b9c83d0 @payload="\x002\x00\n\x00\x00\x00\f\x00\x00\x00\x00", @channel=1> 

有兩點上運行的虛擬腳本,我真不明白:

  1. 相關EventMachine的:在Client的代碼中,爲什麼我必須打電話EM.add_timer,如果我想讓我的消息真正成爲發表?爲什麼使用EM.next_tick不起作用?我的理解是,在這裏發佈發佈時,「一切」應該是「準備好」的。
  2. 與AMQP相關:爲什麼我的客戶端因第二個請求的關閉連接而崩潰?每次推送新請求時,都應創建一個全新的EM/AMQP循環。

很遺憾的是很少有代碼可用在線處理EM/AMQP,所以任何幫助將深表感謝! 關於這個效率的任何評論也將不勝感激。

回答

3

挖掘文檔,我終於發現我實際上需要once_declared回調來確保當客戶端開始使用它時隊列已準備就緒。

關於連接問題,似乎在某種程度上使用EM::Deferrable會導致問題,所以(非常不令人滿意的)解決方案簡單地包含在不包括EM::Deferrable中。

require "amqp" 

module RPC 

    module Base 

    def rabbit(rabbit_callback) 
     rabbit_loop = Proc.new { 
     AMQP.start do |connection| 
      AMQP::Channel.new(connection) do |channel| 
      channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue| 
       requests_queue.once_declared do 
       rabbit_callback.call(connection, channel, requests_queue) 
       end 
      end 
      end 
     end 

     Signal.trap("INT") { AMQP.stop { EM.stop } } 
     Signal.trap("TERM") { AMQP.stop { EM.stop } } 
     } 

     if !EM.reactor_running? 
     @do_not_stop_reactor = false 
     EM.run do 
      rabbit_loop.call 
     end 
     else 
     @do_not_stop_reactor = true 
     rabbit_loop.call 
     end 
    end 
    end 

    class Server 
    include Base 

    def run 
     server_loop = Proc.new do |connection, channel, requests_queue| 
     consumer = AMQP::Consumer.new(channel, requests_queue).consume 
     consumer.on_delivery do |metadata, payload| 
      puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..." 
      channel.default_exchange.publish(Time.now.to_s, 
              :routing_key => metadata.reply_to, 
              :correlation_id => metadata.message_id, 
              :mandatory  => true) 
      metadata.ack 
     end 
     end 
     rabbit(server_loop) 
    end 

    end 

    class Client 
    include Base 

    def sync_push(request) 
     result = nil 
     sync_request = Proc.new do |connection, channel, requests_queue| 
     message_id = Kernel.rand(10101010).to_s 

     response_queue = channel.queue("", :exclusive => true, :auto_delete => true) 
     response_queue.subscribe do |headers, payload| 
      if headers.correlation_id == message_id 
      result = payload 
      AMQP.stop { EM.stop unless @do_not_stop_reactor } 
      end 
     end 

     response_queue.once_declared do 
      puts "[request] Sending a request...#{request} with id #{message_id}" 
      channel.default_exchange.publish(request, 
              :routing_key => requests_queue.name, 
              :reply_to => response_queue.name, 
              :message_id => message_id) 
     end 
     end 

     rabbit(sync_request) 
     result 
    end 
    end 
end 
相關問題