2015-04-28 41 views
0

我使用的地方生產者發佈使用以下策略事件持久主題:JMS的ActiveMQ的主題緩慢消費者

<bean id="jmsTemplateESB" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory"  ref="cachedJmsConnectionFactory" /> 
    <property name="defaultDestination" ref="activeMQTopic" /> 
    <!-- Value = javax.jms.DeliveryMode.PERSISTENT --> 
    <property name="deliveryMode" value="2" /> 
    <!-- Value = javax.jms.Session.AUTO_ACKNOWLEDGE --> 
    <property name="sessionAcknowledgeMode" value="1" /> 
    <!-- Needs to be true for the deliveryMode to work --> 

    <property name="explicitQosEnabled" value="true" /> 
    </bean> 

我使用以下的消費設置:使用下面的聽衆

public static void listenOnTopic(String topicName, MessageListener listener) 
    throws Exception 
    { 
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress); 
    Connection con = factory.createConnection(); 


    con.setClientID("Consumer"); 
    Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    Topic topic = session.createTopic(topicName); 
    TopicSubscriber subscriber = session.createDurableSubscriber(topic, listener.getClass().getName()); 

    subscriber.setMessageListener(listener); 

    con.start(); 
    } 

public class ActiveMQMessageListener implements MessageListener 
{ 
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageListener.class); 

@Autowired 
@Qualifier("jmsEventOutPutChannel") 
MessageChannel outputChannel; 

@Override 
public void onMessage(Message message) { 
    try { 
     BytesMessage bytesMessage= (BytesMessage) message; 
     byte[] data = new byte[(int)bytesMessage.getBodyLength()]; 
     bytesMessage.readBytes(data); 
     org.springframework.integration.Message<byte[]> outputMessage = MessageBuilder.withPayload(data).build(); 
     outputChannel.send(outputMessage); 
    } catch (JMSException e) { 
     e.printStackTrace(); 
     LOG.error("Error while retrieving events from ActiveMQ ",e); 
    } 
} 
} 

以下彈出設置爲輸出通道

<bean id="callerBlockPolicy" class="org.springframework.integration.util.CallerBlocksPolicy"> 
    <constructor-arg type="long" value="10000"></constructor-arg> 
</bean> 

<bean id="jmsListnerTaskExecutor" 
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
    <property name="corePoolSize" value="${CORE_POOL_SIZE}"></property> 
    <property name="maxPoolSize" value="${THREAD_POOL_SIZE_JMS_LISTENER}"></property> 
    <property name="queueCapacity" value="${QUEUE_SIZE_JMS_LISTENER}"></property> 
    <property name="rejectedExecutionHandler" ref="callerBlockPolicy"></property> 
    <property name="waitForTasksToCompleteOnShutdown" value="true"></property> 
</bean> 

<int:channel id="jmsEventOutPutChannel"> 
    <int:dispatcher task-executor="jmsListnerTaskExecutor" /> 
</int:channel> 

此消費者代碼太慢,因此我們無法從主題中高速檢索消息。其實,如果沒有圖片中的「jmsEventOutPutChannel」,我會獲得9500 qps的速率,但在圖片中使用「jmsEventOutPutChannel」,我們的速度在150ps左右的時候會變得非常低。

任何人都可以有任何提示我做錯了這段代碼?

我的「jmsEventOutPutChannel」頻道代碼是否也會影響activeMQ的消費率?

回答

0

真的不是你的消費者代碼是問題所在,但是當把消息發送到輸出通道時有些問題。

重點在這裏,看看爲什麼消息需要這麼長的時間寫入ActiveMQ。首先,我會嘗試使它非持久性(但仍然持久),並查看它的行爲是否有所不同。這可能是因爲ActiveMQ服務器配置不正確,並且寫入後端存儲的效率不高(可能Kahadb無法跟上?)

是否有可能生產者正在爲每個發送的消息創建一個連接,開銷正在殺死你?

您可能會發布您的ActiveMQ網址,不知道您添加了哪些參數可能會有所幫助。但看到它降低這一點顯然是不好的。