2016-09-09 67 views
1

我使用Spring 4.x的DefaultJmsListenerContainerFactory來連接到ActiveMQ隊列,使用@JmsListener處理來自該隊列的消息,然後將消息推送到同一ActiveMQ代理上的主題。DefaultJmsListenerContainerFactory和併發連接不關閉

我爲消費者/監聽者和生產者使用單個緩存連接工廠,並且我將緩存使用者設置爲false,這樣我就可以緩存生產者,而不是使用者。我還將併發性設置爲1-3,我預計在應用程序啓動時隊列中將有至少1個消費者,並且隨着消息增加,消費者數量將達到3.但是,消息越來越少,我預計消費者的數量也會回落到1。但是,如果我看看線程(defaultmessagelistenercontainer-2/3),它們處於等待狀態,並且它們不關閉。當負載消退時,預計消費者的數量是否也會關閉,這不是預期的行爲嗎?請參閱下面的配置,讓我知道這種行爲是不是開箱即用,並且如果我需要添加一些內容,以便按照上面的說明進行操作。

ApplicationContext.java

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 

    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrency(environment.getProperty("jms.connections.concurrent")); 
    factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class)); 
    return factory; 
} 

@Bean 
public CachingConnectionFactory connectionFactory(){ 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(environment.getProperty("jms.redelivery.initial-delay", Long.class)); 
    redeliveryPolicy.setRedeliveryDelay(environment.getProperty("jms.redelivery.delay", Long.class)); 
    redeliveryPolicy.setMaximumRedeliveries(environment.getProperty("jms.redelivery.maximum", Integer.class)); 
    redeliveryPolicy.setUseExponentialBackOff(environment.getProperty("jms.redelivery.use-exponential-back-off", Boolean.class)); 
    redeliveryPolicy.setBackOffMultiplier(environment.getProperty("jms.redelivery.back-off-multiplier", Double.class)); 

    ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("jms.queue.username"), environment.getProperty("jms.queue.password"), environment.getProperty("jms.broker.endpoint")); 
    activeMQ.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQ.setPrefetchPolicy(prefetchPolicy()); 

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ); 
    cachingConnectionFactory.setCacheConsumers(environment.getProperty("jms.connections.cache.consumers", Boolean.class)); 
    cachingConnectionFactory.setSessionCacheSize(environment.getProperty("jms.cache.size", Integer.class)); 
    return cachingConnectionFactory; 
} 

@Bean 
public JmsMessagingTemplate jmsMessagingTemplate(){ 
    ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("jms.queue.out")); 

    JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory()); 
    jmsMessagingTemplate.setDefaultDestination(activeMQ); 

    return jmsMessagingTemplate; 
} 

application.properties

jms.connections.concurrent=1-3 
jms.connections.prefetch=1000 
jms.connections.transacted=true 
jms.connections.cache.consumers=false 
jms.redelivery.initial-delay=1000 
jms.redelivery.delay=1000 
jms.redelivery.maximum=5 
jms.redelivery.use-exponential-back-off=true 
jms.redelivery.back-off-multiplier=2 
jms.cache.size=3 
jms.queue.in=in.queue 
jms.queue.out=out.queue 
jms.broker.endpoint=failover:(tcp://localhost:61616) 

回答

1

嘗試設置maxMessagesPerTask > 0

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 

    factory.setConnectionFactory(connectionFactory()); 
    factory.setMaxMessagesPerTask(1); 
    factory.setConcurrency(environment.getProperty("jms.connections.concurrent")); 
    factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class)); 
    return factory; 
} 

,你可以參考文檔http://docs.spring.io/spring-framework/docs/4.3.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setMaxMessagesPerTask-int-

jms.connections.prefetch=1000意味着如果您有1000條消息在Q上等待,您將只有1個線程開始處理這1000條消息。

例如jms.connections.prefetch=1表示消息將平等地分派給所有可用的線程,但對於此設置maxMessagesPerTask < 0更好,因爲長壽命任務避免了頻繁的線程上下文切換。 http://activemq.apache.org/what-is-the-prefetch-limit-for.html

+0

謝謝!我試了一下,它的工作!然而,當我分析時,我注意到dmlc容器線程不斷被重新創建,假設基於接收嘗試和每個任務的最大消息值。這只是一個觀察,任何人都可以對此發表評論? – jcb

+0

是的,我給了你一個例子,每個任務有1個最大消息,但是1個是我提供的文檔鏈接中說的太低,每個線程在1次嘗試後都會死掉。您需要將max messagespertask增加到10,例如允許10次嘗試,並增加IdleTaskExecutionLimit以允許在達到最大嘗試次數時重新使用線程。它在文檔http://docs.spring.io/spring-framework/docs/4.3.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setIdleTaskExecutionLimit-int- –