作爲一個更大的應用程序的一部分,我必須設置跨多個工作人員的傳出請求的基本速率限制。這背後的想法很簡單:通過發佈帶有「立即」標誌的「令牌」消息,如果沒有人等待它,則自動丟棄該消息。通過讓工作人員在發送傳出請求之前僅訂閱令牌隊列,令牌不會「保存」,並且每個令牌僅可用於一次。我覺得這很優雅。如何讓此AMQP單消息訂閱者保持穩定?
不幸的是,添加和刪除用戶並不完全穩定。我已經在https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733上設置了一個完整的示例。代碼如下:
require 'bundler'
Bundler.setup
require 'amqp'
puts "single-message consumer listening to rapid producer"
QUEUE_NAME = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9
def start_producer
exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "")
n = 0
EM::PeriodicTimer.new(PRODUCE_RATE) do
message = "msg #{n}"
exchange.publish(message,
:immediate => true, # IMPORTANT, messages are dropped if nobody listening now
:routing_key => QUEUE_NAME)
puts "> PUT #{message}"
n += 1
end
end
def start_consumer
EM::PeriodicTimer.new(CONSUME_RATE) do
started = Time.now
AMQP::Channel.new do |channel_consumer|
channel_consumer.prefetch(1)
tick_queue = channel_consumer.queue(QUEUE_NAME)
consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true)
consumer.on_delivery do |_, message|
took = Time.now - started
puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"
consumer.cancel
channel_consumer.close
end
consumer.consume
end
end
end
EM.run do
EM.set_quantum(50)
start_producer
start_consumer
end
運行這個例子幾分鐘結束了兩個錯誤之一垂死:
amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in <class:Consumer>': undefined method `handle_delivery' for nil:NilClass (NoMethodError)
amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in `send_frame': Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0 @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02", @channel=1> (AMQ::Client::ConnectionClosedError)
的第一個錯誤是由於用戶已被刪除,bu消息仍然傳遞給它,並且庫永遠不會期望發生這種情況。第二個錯誤來自發布商,突然間有一個關閉的連接。
我錯過了什麼使其始終按預期工作?使用
版本:
- OS X 10.7.1
- 紅寶石1.9.2p312(2011-08-11的修訂32926)[x86_64的-darwin11.1.0]
- 的RabbitMQ 2.6.1
的Gemfile:
source 'http://rubygems.org'
gem 'amqp'
Gemfile.lock的:
GEM
remote: http://rubygems.org/
specs:
amq-client (0.8.3)
amq-protocol (>= 0.8.0)
eventmachine
amq-protocol (0.8.1)
amqp (0.8.0)
amq-client (~> 0.8.3)
amq-protocol (~> 0.8.0)
eventmachine
eventmachine (0.12.10)
PLATFORMS
ruby
DEPENDENCIES
amqp
eventmachine
在你的答案你說「只使用一個通道」,但你仍然創建兩個?我不明白...我得到這個錯誤,雖然我只創建了一個通道。 –
通道在週期性定時器循環之外聲明並重複用於每個滴答。製片人和消費者確實有一個渠道,但這對於快樂是必要的。 –