0
我爲我的Ruby on Rails應用程序使用AMQP/RabbitMQ。帶有PhusionPassenger/Rails的AMQP/RabbitMQ-Server/EventMachine只能接收第一條消息
我把下面的amqp.rb文件在配置/初始化:(複製和配方改變:http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs)
require 'amqp'
# References:
# 1. Getting Started with AMQP and Ruby
# http://rubyamqp.info/articles/getting_started/
# 2. EventMachine and Rails
# http://www.hiringthing.com/2011/11/04/eventmachine-with-rails.html#sthash.iqCWUtOn.dpbs
# 3. Connecting to the broker, integrating with Ruby on Rails, Merb and Sinatra
# http://rubyamqp.info/articles/connecting_to_broker/
module AppEventMachine
def self.start
if defined?(PhusionPassenger)
Rails.logger.info "###############################################################################"
Rails.logger.info "Running EventMachine/Rails with PhusionPassenger ......"
Rails.logger.info "###############################################################################"
PhusionPassenger.on_event(:starting_worker_process) do |forked|
# => for passenger, we need to avoid orphaned threads
if forked && EventMachine.reactor_running?
EventMachine.stop
end
spawn_eventmachine_thread
die_gracefully_on_signal
end
else
Rails.logger.info "###############################################################################"
Rails.logger.info "PhusionPassenger is not running. Probably you are running Rails locally ......"
Rails.logger.info "###############################################################################"
# faciliates debugging
Thread.abort_on_exception = true
# just spawn a thread and start it up
spawn_eventmachine_thread unless defined?(Thin)
# Thin is built on EventMachine, doesn't need this thread
end
end
def self.spawn_eventmachine_thread
Thread.new {
EventMachine.run do
AMQP.channel ||= AMQP::Channel.new(AMQP.connect(:host => '127.0.0.1')) # Q_SERVER, :user=> Q_USER, :pass => Q_PASS, :vhost => Q_VHOST))
AMQP.channel.on_error(&method(:handle_channel_exception))
AMQP.channel.queue(MixpanelJob::QUEUE_NAME, :exclusive => true)
.subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) }
end
}
end
def self.handle_channel_exception(channel, channel_close)
Rails.logger.error "###############################################################################"
Rails.logger.error "Oops... a channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
Rails.logger.error "###############################################################################"
end
def self.die_gracefully_on_signal
Signal.trap("INT") {
Rails.logger.error "###############################################################################"
Rails.logger.error "Stopping the EventMachine ......"
EventMachine.stop
Rails.logger.error "###############################################################################"
}
Signal.trap("TERM") {
Rails.logger.error "###############################################################################"
Rails.logger.error "Stopping the EventMachine ......"
EventMachine.stop
Rails.logger.error "###############################################################################"
}
end
end
AppEventMachine.start
後,我開始與Rails的PhusionPassenger,我看到了它與PhusionPassenger運行,並且然後,我嘗試將消息發送到隊列,但讓我吃驚:
.subscribe { |metadata, payload| MixpanelJob::handle_sending(metadata, payload) }
的subscribe
處理器只執行一次,即只接收第一條消息,任何其他消息(第二,第三),在subscribe
H安德勒永遠不會被召喚。
如果您的工作失敗,可能會發生這種情況。嘗試用「puts payload」替換MixpanelJob :: handle_sending(元數據,有效載荷)並查看它是否有效,然後 – Joshua
哦,是的,你說得對。看來我的'MixpanelJob :: handle_sending'失敗了。我需要處理。嗯,奇怪的是'Mixpanel'從不會引發錯誤。 –