您應該在Channel.basicConsume和DefaultConsumer抽象類看一看:https://www.rabbitmq.com/api-guide.html#consuming
Java併發需要回調來處理每個消息線程,但你可以使用一個線程池來重用線程。
static final ExecutorService threadPool;
static {
threadPool = Executors.newCachedThreadPool();
}
現在,你需要創建一個消費者,將通過創建將被傳遞到線程池來執行一個Runnable實例句柄每次交貨。
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final byte msgBody = body; // a 'final' copy of the body that you can pass to the runnable
final long msgTag = envelope.getDeliveryTag();
Runnable runnable = new Runnable() {
@Override
public void run() {
// handle the message here
doStuff(msgBody);
channel.basicAck(msgTag, false);
}
};
threadPool.submit(runnable);
}
});
這說明了如何在單個線程的單一連接和頻道不while循環將被阻止在每個交貨處理併發交付。爲了您的理智,您可能希望將您的Runnable
實現分解到它自己的類中,該類可以接受channel
,msgBody
,msgTag
和任何其他數據作爲調用run()
方法時可訪問的參數。
非常感謝您的幫助。 – crawlero 2014-10-12 22:24:32