2015-08-27 88 views
0

我使用的是amqp.node,我想建立一個系統,每秒從隊列中檢索1條消息,而不管隊列中有多少個。我想檢查隊列大小,然後我從渠道get雖然。我怎麼做?rabbitmq當前的隊列大小

consumer.js

#!/usr/bin/env node 

var amqp = require('amqplib/callback_api') 
amqp.connect('<my_rabbitmq_host_info>', function(err, conn){ 
    conn.createChannel(function(err, ch){ 
    var q = 'message-queue' 
    ch.assertQueue(q, {durable: false}, function(err, queue){ 
     console.log(' [*] waiting for messages in queue: %s -- to exit press ctrl+c', q) 

     setInterval(function(){ 
     getMessage(ch, q, queue) 
     }, 1000) 
    }) 
    }) 
}) 

function getMessage(ch, q, queue){ 
    if(!queue){ 
    return 
    } 

    console.log('queue %s has %d messages in it...', q, queue.messageCount) 
    if(queue.messageCount > 0){ 
    console.log('getting 1 message from queue') 
    ch.get(q, {noAck: false}, function(err, msg){ 
     console.log(' [x] message recieved: %s \n\n', msg.content.toString()) 
     ch.ack(msg) 
    }) 
    } 
} 

每次我得到相同數量的queue.messageCount我以爲因爲我每次發送相同queue實例的功能。我如何獲得目前的queue.messageCount

回答

1

您必須在每次想要獲得當前尺寸時撥打assertQueue。反轉你的代碼,這樣你就可以每隔一次間隔超時而不是隻調用一次。


setInterval(function(){ 

    ch.assertQueue(q, {durable: false}, function(err, queue){ 
    console.log(' [*] waiting for messages in queue: %s -- to exit press ctrl+c', q) 
    getMessage(ch, q, queue) 
    }); 

}, 1000) 

也 - 這對我來說似乎是一個壞主意。我會建議一種不同的方法。爲消費者設置the prefetch limit爲1。那麼你的代碼一次只能提取一條消息。

也,這可能幫助:http://dougbarth.github.io/2011/06/10/keeping-the-rabbit-on-a-leash.html - 不同的語言,但其概念應該翻譯成節點

+0

感謝德里克。我能夠實現令牌桶並在示例應用程序中使用預取,就像這樣。工作完美! – rkstar