2017-08-16 76 views
2

我正在使用spring-rabbitmq,並且我可以成功獲取消息。 但是當我調試時,我發現監聽器創建一個線程,它會每1秒詢問 消息。我認爲速率太高,我想要做的是將速率設置爲1分鐘或任何其他。 我搜索了很多,但沒有工作如何設置spring-rabbitmq收聽消息的速率

我springrabbit.xml:

<rabbit:listener-container connection-factory="connectionFactory" message-converter="jsonMessageConverter" > 
     <rabbit:listener queues="notification" ref="messageReceiver"/> 
    </rabbit:listener-container> 

我的Java代碼:

@Override 
     public void onMessage(Message message) { System.out.println("messagebody: "+new String(message.getBody())); 
      LOGGER.info(dateFormatUtil.getDateFormat(new Date())+new String(message.getBody())); 
      boolean result=false; 
      SendSingleEmailService sendSingleEmailService = new SendSingleEmailService(); 
      try { 
       result =sendSingleEmailService.send(new String(message.getBody())); 
      } catch (FileNotFoundException e) { 
       LOGGER.error(dateFormatUtil.getDateFormat(new Date())+"[NOTIFICATION] [ERROR] message is null!"); 
       e.printStackTrace(); 
      } 
      if(!result) { 
       try{ 
        throw new Exception(); 
       }catch (FileNotFoundException e) { 
        throw new RuntimeException(e); 
       }catch (Exception e) { 
        throw new RuntimeException(e); 
       }finally { 
        LOGGER.error(dateFormatUtil.getDateFormat(new Date())+"[NOTIFICATION] [ERROR] Send Email failed!"); 
       } 
      } 


     } 

是一些調試結果如下:

[2017-08-16 18:23:08,595]DEBUG 4286[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:09,600]DEBUG 5291[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:10,602]DEBUG 6293[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:11,603]DEBUG 7294[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:12,609]DEBUG 8300[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:13,612]DEBUG 9303[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:14,615]DEBUG 10306[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:15,617]DEBUG 11308[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:16,618]DEBUG 12309[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:17,619]DEBUG 13310[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 

回答

0

您可以增加receiveTimeout - 請參見Message Listener Container Configuration

但是,容器對stop()請求的響應較少。

我認爲你過分關注輪詢速率 - 輪詢用於階段遞送消息的內部隊列的開銷很小。

如果只是要刪除(在調試時)日誌「噪音」設置org.springframework.amqp.rabbit.listener.BlockingQueueConsumer日誌類別INFOWARN

即將發佈的2.0版本有一個新的DirectMessageListenerContainer,它不輪詢內部隊列並消除此問題。 Info here

編輯

聽衆仍然要求的RabbitMQ的消息每1秒

如果你仍然看到調試消息每隔1秒,你有沒有配置正確receiveTimeout;它不是「詢問rabbitmq」的消息,線程在等待receiveTimeout(並且發現兔子沒有發送新消息)後醒來,因此它可以對stop()作出反應;然後它再次睡覺,直到有新消息到達或再次超時。如果沒有消息可用,則不會與代理進行交互 - 消息由代理推送。

也許你誤解了偵聽器容器的用途。它用於消息驅動的應用程序 - 不能「減慢」消息到達的速度 - 它們是由代理推動的。

如果您希望每分鐘只收到一條消息,則應該使用RabbitTemplate (或receiveAndConvert())方法而不是消息偵聽器容器。

+0

謝謝,我閱讀文檔並嘗試[receiveTimeout],然後我發現它只在我收到消息時纔會發生,否則聽衆仍會每隔1秒向rabbitmq發送消息。我想設置監聽器每隔1分鐘詢問一次消息。執行此操作嗎?謝謝。 – shawn

+0

請參閱編輯我的答案。 –

+0

omg,我誤解了它。謝謝,我再次嘗試receiveAndConvert(),它的工作原理。 – shawn