2017-04-19 53 views
1

阻止了其他郵件,我們實施了,我們要等待響應特定的用例分佈式請求/響應類型的架構。我們使用的JMS代理是ActiveMq,代碼使用Spring連接在一起。使用Spring JmsTemplate的/ ActiveMQ的使用請求/應答,而不在隊列中

我們看到的問題是,它看來,如果發送了一堆請求到相同的目的地,任何請求,比如,需要的時間顯著量來完成,塊跟在它後面的請求消息。消費者使用的SessionAwareMessageListener接口僅支持onMessage()方法。在這裏實現並行性的最佳方式是什麼,即如果一個特定的請求需要很長時間,隊列中的其他消息不應該被阻塞?

有此SO後,但它並沒有回答我的問題。 JMS: Can we get multiple messages from queue in OnMessage() withtout commit or rollback

感謝

相關的代碼片段(異常處理等,爲簡潔,刪除)

生產者

public class MyJmsProducer { 

private ProcessingResponse sendMessage(final Serializable serializable) { 
    //send JMS request and wait for response 
    return jmsMessagingTemplate.convertSendAndReceive(destination, serializable, ProcessingResponse.class); //this operation seems to be blocking + sync 
    } 
} 

與聽者(消費者)

public class MyJmsListener 
    implements SessionAwareMessageListener<Message>, NotificationHandler<Task> { 

@Override 
public void onMessage(Message message, Session session) 
     throws JMSException { 
    ProcessingRequest processingRequest = (ProcessingRequest) ((ObjectMessage) message).getObject(); 

    // handle the request here (THIS COULD TAKE A WHILE) 
    handleRequest(processingRequest); 


    // done handling the request, now create a response message 
    final ObjectMessage responseMessage = new ActiveMQObjectMessage(); 
    responseMessage.setJMSCorrelationID(message.getJMSCorrelationID()); 
    responseMessage.setObject(processingResponse); 

    // Message sent back to the replyTo address of the income message. 
    final MessageProducer producer = session.createProducer(message.getJMSReplyTo()); 
    producer.send(responseMessage); 

    } 
} 
+0

肯定,你可以增加併發用戶數,但如果handleRequest方法從消耗大量的系統資源,在每個請求可能會增加一般平均時間。 – user1516873

+0

如果交不解決您的問題,那麼你真正的問題是不明確的,你的準確有問題,而您發送回覆經紀人即'生產商。發送(responseMessage);'?您是否認爲製作人提出請求並將其發送給經紀人等待消費者消費並完成? – hagrawal

回答

2

可以使用該DMLC的增加消息的消耗速度和解決速度慢的消費問題:

@Bean 
public DefaultMessageListenerContainer dmlc() { 
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer(); 
    dmlc.setMaxConcurrentConsumers(10); 
    dmlc.setConcurrentConsumers(5); 
    return dmlc; 
} 

你需要適應的prefetchPolicy併發消費者:

persistent queues (default value: 1000) 
non-persistent queues (default value: 1000) 
persistent topics (default value: 100) 
non-persistent topics (default value: Short.MAX_VALUE - 1) 

所有消息被分派到第一所連接的消費者當另外一個連接到相同的目的地,他沒有收到消息,因此要改變這種行爲,你需要prefetchPolicy設置爲默認的較低值。例如,這jms.prefetchPolicy.queuePrefetch=1添加到URI配置在activemq.xml中或將其設置在客戶端的URL等推薦用於高 消息量高性能的這個

@Bean 
public ConnectionFactory connectionFactory() { 
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
      "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1"); 
    return connectionFactory; 
} 

預取大值。但是,對於消息量較小的消息,每個消息需要很長時間才能處理,應將預取設置爲1. 這可確保消費者一次只處理一條消息。 然而,指定預取限制爲零將導致消費者 輪詢一次一個消息,而不是推送給消費者的消息是 。

看看http://activemq.apache.org/what-is-the-prefetch-limit-for.html

而且

http://activemq.apache.org/destination-options.html

相關問題