2011-10-05 42 views
3

作爲一個更大的應用程序的一部分,我必須設置跨多個工作人員的傳出請求的基本速率限制。這背後的想法很簡單:通過發佈帶有「立即」標誌的「令牌」消息,如果沒有人等待它,則自動丟棄該消息。通過讓工作人員在發送傳出請求之前僅訂閱令牌隊列,令牌不會「保存」,並且每個令牌僅可用於一次。我覺得這很優雅。如何讓此AMQP單消息訂閱者保持穩定?

不幸的是,添加和刪除用戶並不完全穩定。我已經在https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733上設置了一個完整的示例。代碼如下:

require 'bundler' 
Bundler.setup 

require 'amqp' 

puts "single-message consumer listening to rapid producer" 

QUEUE_NAME = 'test.rapid-queue-unsubscription' 
PRODUCE_RATE = 1.0/10 
CONSUME_RATE = 1.0/9 

def start_producer 
    exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "") 

    n = 0 
    EM::PeriodicTimer.new(PRODUCE_RATE) do 
    message = "msg #{n}" 
    exchange.publish(message, 
        :immediate => true, # IMPORTANT, messages are dropped if nobody listening now 
        :routing_key => QUEUE_NAME) 
    puts "> PUT #{message}" 
    n += 1 
    end 
end 

def start_consumer 

    EM::PeriodicTimer.new(CONSUME_RATE) do 

    started = Time.now 
    AMQP::Channel.new do |channel_consumer| 
     channel_consumer.prefetch(1) 
     tick_queue = channel_consumer.queue(QUEUE_NAME) 

     consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true) 
     consumer.on_delivery do |_, message| 

     took = Time.now - started 
     puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]" 

     consumer.cancel 
     channel_consumer.close 
     end 
     consumer.consume 
    end 
    end 
end 

EM.run do 
    EM.set_quantum(50) 

    start_producer 
    start_consumer 
end 

運行這個例子幾分鐘結束了兩個錯誤之一垂死:

  1. amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in <class:Consumer>': undefined method `handle_delivery' for nil:NilClass (NoMethodError)

  2. amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in `send_frame': Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0 @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02", @channel=1> (AMQ::Client::ConnectionClosedError)

的第一個錯誤是由於用戶已被刪除,bu消息仍然傳遞給它,並且庫永遠不會期望發生這種情況。第二個錯誤來自發布商,突然間有一個關閉的連接。

我錯過了什麼使其始終按預期工作?使用

版本:

  • OS X 10.7.1
  • 紅寶石1.9.2p312(2011-08-11的修訂32926)[x86_64的-darwin11.1.0]
  • 的RabbitMQ 2.6.1

的Gemfile:

source 'http://rubygems.org' 

gem 'amqp' 

Gemfile.lock的:

GEM 
    remote: http://rubygems.org/ 
    specs: 
    amq-client (0.8.3) 
     amq-protocol (>= 0.8.0) 
     eventmachine 
    amq-protocol (0.8.1) 
    amqp (0.8.0) 
     amq-client (~> 0.8.3) 
     amq-protocol (~> 0.8.0) 
     eventmachine 
    eventmachine (0.12.10) 

PLATFORMS 
    ruby 

DEPENDENCIES 
    amqp 
    eventmachine 

回答

2

從#rabbitmq通道(AMQP作者antares_):只使用一個單一的通道,它會正常工作。稍微改變,但穩定版本:

require 'bundler' 
Bundler.setup 

require 'amqp' 

puts "single-message consumer listening to rapid producer" 

QUEUE_NAME = 'test.rapid-queue-unsubscription' 
PRODUCE_RATE = 1.0/10 
CONSUME_RATE = 1.0/9 

def start_producer channel 
    exchange = AMQP::Exchange.new(channel, :direct, "") 

    n = 0 
    EM::PeriodicTimer.new(PRODUCE_RATE) do 
    message = "msg #{n}" 
    exchange.publish(message, 
        :immediate => true, # IMPORTANT, messages are dropped if nobody listening now 
        :routing_key => QUEUE_NAME) 
    puts "> PUT #{message}" 
    n += 1 
    end 
end 

def start_consumer channel 
    EM::PeriodicTimer.new(CONSUME_RATE) do 

    started = Time.now 
    tick_queue = channel.queue(QUEUE_NAME) 

    consumer = AMQP::Consumer.new(channel, tick_queue, nil, exclusive = false, no_ack = true) 
    consumer.on_delivery do |_, message| 

     took = Time.now - started 
     puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]" 

     consumer.cancel do 
     puts "< GET #{message} (CANCEL DONE)" 
     end 
    end 
    consumer.consume 
    end 
end 

EM.run do 
    EM.set_quantum(50) 

    AMQP::Channel.new do |channel| 
    start_producer channel 
    end 

    AMQP::Channel.new do |channel| 
    channel.prefetch(1) 
    start_consumer channel 
    end 

end 
+0

在你的答案你說「只使用一個通道」,但你仍然創建兩個?我不明白...我得到這個錯誤,雖然我只創建了一個通道。 –

+0

通道在週期性定時器循環之外聲明並重複用於每個滴答。製片人和消費者確實有一個渠道,但這對於快樂是必要的。 –