2017-02-27 73 views
0

我試圖從rabbitMQ隊列獲取所有消息。從rabbitMQ獲取所有消息

const messages = await rabbit.getMessages(outputQueue, false); 

這裏是getMessages方法的實現。問題是它只處理3-5條消息並調用'resolve'。一段時間後,它處理休息消息,但'解決'已被調用,並且不能再次執行。

const amqp = require('amqplib'); 
. 
. 
let amqpUrl; 
let queueConf; 

const init = (connection, queue) => { 
    amqpUrl =`amqp://${connection.user}:${connection.password}@${connection.host}:${connectio n.port}`; 
    if (connection.vhost) { 
amqpUrl = `amqp://${connection.user}:${connection.password}@${connection.host}:${connection.port}/${connection.vhost}`; 
    } 
    queueConf = queue; 
} 

const getChannel =() => new Promise((resolve) => { 
    amqp.connect(amqpUrl).then((conn) => { 
    conn.createChannel().then((ch) => { 
     ch.prefetch(1000).then(() => resolve(ch)) 
    }) 
    }) 
}) 

module.exports = (connection, queue) => { 
    init(connection, queue); 
    return { 
    getMessages: (queueName, cleanQueue) => new Promise((resolve) => { 
     let messages = []; 
     let i = 1; 
     getChannel().then((ch) => { 
     ch.consume(queueName, (msg) => { 
      messages.push(msg); 
      console.log(msg.content.toString()) 
     }, { noAck: cleanQueue }).then(() => { 
      logger.info(`Retreived ${messages.length} messages from ${queueName}`); 
      resolve(messages) 
     }) 
     }) 
    }) 
    . 
    . 
    }; 
    }; 

在此先感謝!

+1

證明,承諾不適用於所有的解決方案 - 並承諾絕對不適合這樣的事情 –

回答

1

你可以這樣做,但它會非常慢,並且如果消息被添加到隊列中的速度比消耗速度快,它可能永遠不會解決。從本質上講,你一直在同一時間得到一個消息,直到channel.get()解決了false,而不是消息對象:

getMessages: (queueName, cleanQueue) => { 
    let messages = [] 
    let i = 1 
    return getChannel().then(function getMessage (ch) { 
    return ch.get(queueName, { noAck: cleanQueue }).then((msg) => { 
     if (msg) { 
     messages.push(msg) 
     return getMessage(ch) 
     } else { 
     logger.info(`Retrieved ${messages.length} messages from ${queueName}`) 
     return messages 
     } 
    }) 
    }).catch((err) => { 
    err.consumedMessages = messages 
    return Promise.reject(err) 
    }) 
} 
相關問題