2017-09-18 117 views
0

使用Ruby的「bunny」RabbitMQ客戶端,我希望我的生產者(某些Ruby代碼)向消費者(使用「sneakers」gem)的工作者發送消息,並且我希望我的生產者不要執行另一行Ruby代碼,直到生產者收到消費者收到我的消息並做了一些工作的確認。等待消息生產者收到消費者完成的工作確認

在我的消費者中,我正在做一些工作,然後調用運動鞋'ack!方法來確認郵件已收到並且工作已完成。

在我的製片人,我對我的Bunny::Channel實例調用confirm_select付諸確認碼,並publish後-ing我的短信,我打電話wait_for_confirms通道上理應等到我所有的消息都被ack!由-ed消費者。 (我嘗試過在兔子文檔中發現的東西here。)

但是,似乎我的製作人並沒有等待消費者撥打ack!。我正在登錄我的製片人和我的消費者,並發現我的製作人似乎認爲在消費者實際承認他們之前已經確認了消息。

如何讓RabbitMQ生產者等到消費者在Ruby中完成其工作?

Ruby 2.3.3,RabbitMQ 3.6.12,Erlang 17.3。

這裏是我的鎖文件:

GEM 
    specs: 
    amq-protocol (2.2.0) 
    bunny (2.7.0) 
     amq-protocol (>= 2.2.0) 
    concurrent-ruby (1.0.5) 
    serverengine (1.5.11) 
     sigdump (~> 0.2.2) 
    sigdump (0.2.4) 
    sneakers (2.6.0) 
     bunny (~> 2.7.0) 
     concurrent-ruby (~> 1.0) 
     serverengine (~> 1.5.11) 
     thor 
    thor (0.20.0) 

PLATFORMS 
    ruby 

DEPENDENCIES 
    bunny 
    sneakers 

BUNDLED WITH 
    1.14.6 

這裏是我的消費/工人(consumer_worker.rb):

class ConsumerWorker 
    include Sneakers::Worker 

    from_queue 'do-work-here', 
      exchange: 'do-work-here', 
      exchange_type: :direct, 
      durable: true, 
      prefetch: 1, 
      arguments: { 
       :'x-dead-letter-exchange' => 'do-work-here-retry' 
      }, 
      timeout_job_after: 5, 
      retry_timeout: 60000, 
      ack: true 

    def work(msg) 
    open('ruby-debug.log', 'a') do |f| 
     f.puts "message received: #{msg}" 
    end 
    sleep 1 
    open('ruby-debug.log', 'a') do |f| 
     f.puts "acknowledging message at: #{Time.now.to_i}" 
    end 
    ack! 
    open('ruby-debug.log', 'a') do |f| 
     f.puts "acknowledged message at: #{Time.now.to_i}" 
    end 
    end 
end 

在一個終端選項卡,我正在這名工人用:

bundle exec sneakers work ConsumerWorker --require consumer_worker.rb 

這是我的出版商(publisher.rb):

require 'bunny' 
connection = Bunny.new('amqp://guest:[email protected]:5672').tap(&:start) 
channel = connection.create_channel 
channel.confirm_select 
queue = channel.queue('do-work-here', 
         {arguments: {:'x-dead-letter-exchange' => 'do-work-here-retry'}, 
         durable: true}) 
queue.publish('hello world', persistent: true) 
channel.wait_for_confirms 
open('ruby-debug.log', 'a') do |f| 
    f.puts "messages confirmed at: #{Time.now.to_i}" 
end 

當我在另一個選項卡中運行以下命令:

ruby ./publisher.rb 

然後我的日誌文件(./ruby-debug.log)包含以下行:

message received: hello world 
messages confirmed at: 1505774819 
acknowledging message at: 1505774820 
acknowledged message at: 1505774820 

我想是對的順序事件如下:

message received 
acknowledging message 
acknowledged message 
messages confirmed 

我如何解決這個問題?

+0

我認爲我對出版商如何確認的理解應該是有效的是錯誤的。請參閱https://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2014-January/033269.html。本教程似乎可能與解決我的問題有關:https://www.rabbitmq.com/tutorials/tutorial-six-ruby.html – Jackson

回答

0

發佈者確認僅涵蓋發佈者與RabbitMQ的溝通。出版商不知道消費者。

如果請求/響應模式爲 ​​,請參閱教程6。

ConditionVariable是一個常用的併發原語,用於延遲進一步的操作,直到發生事件,並有可選的超時。

發佈商確認和消費者確認記錄在http://www.rabbitmq.com/confirms.htm

+0

任何想法如何執行RPC與運動鞋? – Jackson

相關問題