2017-02-26 121 views
0

我正在使用下面的代碼爲多個使用者創建多個JMS會話以使用消息。我的問題是代碼以單線程方式運行。即使消息存在於隊列中,第二個線程也無法接收任何內容,只是保持輪詢。第一個線程同時完成第一批處理並返回並消耗剩餘的消息。這裏的用法有什麼問題嗎?多線程JMS客戶端ActiveMQ

static { 
    try { 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616"); 
     connection = connectionFactory.createConnection(); 
     connection.start(); 
    } catch (JMSException e) { 
     LOGGER.error("Unable to initialise JMS Queue.", e); 
    } 

} 

public JMSClientReader(boolean isQueue, String name) throws QueueException { 

    init(isQueue,name); 
} 

@Override 
public void init(boolean isQueue, String name) throws QueueException 
{ 

    // Create a Connection 
    try { 
     // Create a Session 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     if (isQueue) { 
      destination = new ActiveMQQueue(name);// session.createQueue("queue"); 
     } else { 
      destination = new ActiveMQTopic(name);// session.createTopic("topic"); 
     } 
     consumer = session.createConsumer(destination); 
    } catch (JMSException e) { 
     LOGGER.error("Unable to initialise JMS Queue.", e); 
     throw new QueueException(e); 
    } 
} 

public String readQueue() throws QueueException { 

    // connection.setExceptionListener(this); 
    // Wait for a message 
    String text = null; 
    Message message; 
    try { 
     message = consumer.receive(1000); 
     if(message==null) 
      return "done"; 
     if (message instanceof TextMessage) { 
      TextMessage textMessage = (TextMessage) message; 
      text = textMessage.getText(); 
      LOGGER.info("Received: " + text); 
     } else { 
      throw new JMSException("Invalid message found"); 
     } 
    } catch (JMSException e) { 
     LOGGER.error("Unable to read message from Queue", e); 
     throw new QueueException(e); 
    } 


    LOGGER.info("Message read is " + text); 
    return text; 

} 
+0

如果你想多消費者的消費信息,而不是使用隊列主題看一看。 http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html – Rjiuk

+0

在我的情況下,我只是試圖增加偵聽隊列的偵聽器的數量。目前不想使用主題。 – HariJustForFun

回答

4

你的問題是prefetchPolicy。

persistent queues (default value: 1000) 
non-persistent queues (default value: 1000) 
persistent topics (default value: 100) 
non-persistent topics (default value: Short.MAX_VALUE - 1) 

所有消息被派往第一所連接的消費者,當另外一個連接,他沒有收到郵件,因此,如果你有需要prefetchPolicy設定的較低值的隊列同時消費者改變這種行爲比默認值。例如,這jms.prefetchPolicy.queuePrefetch=1添加到URI配置在activemq.xml中或將其設置在客戶端的URL等推薦用於高 消息量高性能的這個

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616?jms.prefetchPolicy.queuePrefetch=1"); 

預取大值。但是,對於消息量較小的消息,每個消息需要很長時間才能處理,預取應設置爲1. 這確保消費者一次只處理一條消息。 但是,指定預取限制爲零將導致消費者 輪詢一次一個消息,而不是將消息推送給使用者,該消息是 。

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

而且

http://activemq.apache.org/destination-options.html

+0

它的工作。謝謝你的出色答案。這讓我瘋了。 – HariJustForFun