2011-04-15 111 views
8

我有一個場景,我需要非常快速地分發和處理作業。我將在隊列中快速填充大約45個作業,並且可以同時處理大約20個作業(5臺機器,每個4個核心)。每個工作都需要花費不同的時間,並且使垃圾收集變得複雜是一個問題,所以我需要能夠讓消費者脫機進行垃圾收集。訂閱隊列,收到1封郵件,然後取消訂閱

目前,我有一切工作與流行(每消費者每5ms彈出)。這似乎是不可取的,因爲它轉換爲rabbitmq每秒600個流行請求。

我會喜歡,如果它有一個彈出命令,就像訂閱,但只有一個消息。 (這個過程會阻塞,等待來自rabbitMQ連接的輸入,通過類似於Kernel.select的東西)

我試圖欺騙AMQP gem來做這樣的事情,但它不工作:我無法取消訂閱,直到隊列爲空,並且沒有更多消息正被髮送給消費者。我擔心的其他取消訂閱方法將會丟失信息。

consume_1.rb:

require "amqp" 

EventMachine.run do 
    puts "connecting..." 
    connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/") 
    puts "Connected to AMQP broker" 

    channel = AMQP::Channel.new(connection) 
    queue = channel.queue("tasks", :auto_delete => true) 
    exchange = AMQP::Exchange.default(channel) 

    queue.subscribe do |payload| 
    puts "Received a message: #{payload}." 
    queue.unsubscribe { puts "unbound" } 
    sleep 3 
    end 
end 

consumer_many.rb:

require "amqp" 

# Imagine the command is something CPU - intensive like image processing. 
command = "sleep 0.1" 

EventMachine.run do 
    puts "connecting..." 
    connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/") 
    puts "Connected to AMQP broker" 

    channel = AMQP::Channel.new(connection) 
    queue = channel.queue("tasks", :auto_delete => true) 
    exchange = AMQP::Exchange.default(channel) 

    queue.subscribe do |payload| 
    puts "Received a message: #{payload}." 
    end 
end 

producer.rb:

require "amqp" 

i = 0 
EventMachine.run do 
    connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/") 
    puts "Connected to AMQP broker" 

    channel = AMQP::Channel.new(connection) 
    queue = channel.queue("tasks", :auto_delete => true) 
    exchange = AMQP::Exchange.default(channel) 

    EM.add_periodic_timer(1) do 
    msg = "Message #{i}" 
    i+=1 
    puts "~ publishing #{msg}" 
    end 
end 

我會推出consume_many.rb和producer.rb。消息將按預期流動。

當我啓動consume_1.rb時,它會獲取所有其他消息(如預期的那樣)。但它從來沒有取消訂閱,因爲它從來沒有完成處理所有的消息......所以它就這樣了。

如何獲取consume_1.rb訂閱隊列,獲取單個消息,然後將自己從負載平衡器環中取出,以便它可以完成工作,而不會丟失任何可能存在的額外待處理作業隊列,否則將被安排發送到過程?

+0

編寫一個表現這種方式的服務器在ruby中會很愚蠢......也許使用zero-mq來實現我自己的代理是解決方案嗎? – 2011-04-15 23:22:47

回答

13

這是一個簡單的,但記錄很差的AMQP寶石的功能,你需要的是這樣的:

在您的消費:

channel = AMQP::Channel.new(connection, :prefetch => 1) 

然後用你的訂閱塊,做:

queue.subscribe(:ack => true) do |queue_header, payload| 
    puts "Received a message: #{payload}." 
    # Do long running work here 

    # Acknowledge message 
    queue_header.ack 
end 

這是什麼告訴RabbitMQ只發送您的消費者1消息,並且在長時間運行一段時間後,在隊列標題上調用ack之前不發送其他消息。

我可能需要糾正這一點,但我相信direct交換將更適合這項任務。

+0

伊凡,你是個天才。我一直在尋找這個好幾個月了。謝謝! – 2011-04-16 04:22:13

+0

我一點也不感到驚訝,因爲在閱讀源代碼之前,我已經把頭撞到桌子上好幾個月了。 – Ivan 2011-04-16 04:30:32

+0

伊萬 - 我不認爲交換的類型在這裏真的很重要。他的問題是在隊列級別,因爲聽起來他希望多個消費者從同一個隊列中工作,每個人輪流抽取一個工作項並處理它。因此,使用的交換類型並不重要,因爲一旦將消息插入到隊列中,交換基本上不在圖片中。 – 2011-04-17 05:05:22

1

隨着我的環境,

RabbitMQ版本:3.3。3

AMQP寶石版本:1.5.0

從伊萬該溶液仍然導致所有的消息被從隊列中取出。

相反,訂閱隊列的未確認消息的數量可以通過設置信道的QoS來限制。

按AMQP ::通道的API文檔,對於該方法

#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object 

一個說明,如果你是2.3.6版本,prefetch_size的運行後的RabbitMQ服務器已被棄用。

channel = AMQP::Channel.new(connection, :prefetch => 1) 
channel.qos(0, 1) 

queue = channel.queue(queue_name, :auto_delete => false) 
queue.subscribe(:ack => true) do |metadata, payload| 
    puts "Received a message: #{payload}." 

    # Do long running work here 

    # Acknowledge message 
    metadata.ack 
end 

希望解決方案可以幫助別人。

乾杯。

+0

這條線有訣竅。謝謝。 'channel.qos(0,1)' – mipmip 2015-09-16 22:13:28