我正在使用spring-rabbitmq,並且我可以成功獲取消息。 但是當我調試時,我發現監聽器創建一個線程,它會每1秒詢問 消息。我認爲速率太高,我想要做的是將速率設置爲1分鐘或任何其他。 我搜索了很多,但沒有工作如何設置spring-rabbitmq收聽消息的速率
我springrabbit.xml:
<rabbit:listener-container connection-factory="connectionFactory" message-converter="jsonMessageConverter" >
<rabbit:listener queues="notification" ref="messageReceiver"/>
</rabbit:listener-container>
我的Java代碼:
@Override
public void onMessage(Message message) { System.out.println("messagebody: "+new String(message.getBody()));
LOGGER.info(dateFormatUtil.getDateFormat(new Date())+new String(message.getBody()));
boolean result=false;
SendSingleEmailService sendSingleEmailService = new SendSingleEmailService();
try {
result =sendSingleEmailService.send(new String(message.getBody()));
} catch (FileNotFoundException e) {
LOGGER.error(dateFormatUtil.getDateFormat(new Date())+"[NOTIFICATION] [ERROR] message is null!");
e.printStackTrace();
}
if(!result) {
try{
throw new Exception();
}catch (FileNotFoundException e) {
throw new RuntimeException(e);
}catch (Exception e) {
throw new RuntimeException(e);
}finally {
LOGGER.error(dateFormatUtil.getDateFormat(new Date())+"[NOTIFICATION] [ERROR] Send Email failed!");
}
}
}
是一些調試結果如下:
[2017-08-16 18:23:08,595]DEBUG 4286[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:09,600]DEBUG 5291[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:10,602]DEBUG 6293[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:11,603]DEBUG 7294[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:12,609]DEBUG 8300[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:13,612]DEBUG 9303[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:14,615]DEBUG 10306[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:15,617]DEBUG 11308[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:16,618]DEBUG 12309[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:17,619]DEBUG 13310[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
謝謝,我閱讀文檔並嘗試[receiveTimeout],然後我發現它只在我收到消息時纔會發生,否則聽衆仍會每隔1秒向rabbitmq發送消息。我想設置監聽器每隔1分鐘詢問一次消息。執行此操作嗎?謝謝。 –
shawn
請參閱編輯我的答案。 –
omg,我誤解了它。謝謝,我再次嘗試receiveAndConvert(),它的工作原理。 – shawn