2011-12-20 90 views
2

我有一個擁有持久訂閱者的主題。我可以發佈和使用這些消息,但是我發現在閱讀主題消息時會有一些延遲。從JMS消費消息時的延遲主題

我無法在一次調用中讀取消息。我需要多次調用該方法來讀取消息。我錯過了什麼?

private void publishMessage() { 
     TopicConnection topicConnection = null; 
     TopicSession topicSession = null; 
     TopicPublisher topicPublisher = null; 
     try { 
      topicConnection = connectionFactory.createTopicConnection(); 
      topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); 
      Topic topicName= topicSession.createTopic(topicName); 
      topicPublisher = topicSession.createPublisher(topicName); 
      ObjectMessage message = topicSession.createObjectMessage(customObject) 
      message.setStringProperty("user", userProperty); 
      topicPublisher.publish(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, timeToLive); 
     } catch (JMSException e) { 
      throw new RuntimeException("Error Sending UMessage", e); 
     } finally { 
      closeConnections(null, topicPublisher, topicSession, topicConnection); 
     } 
    } 

public void consumeMessages(String userId, int maxResults) { 
    TopicConnection topicConnection = null; 
    TopicSession topicSession = null; 
    TopicSubscriber topicSubscriber = null; 

    try { 
     topicConnection = connectionFactory.createTopicConnection("guest","guest"); 
     topicConnection.setClientID("topic"); 
     topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); 
     Topic topicName= topicSession.createTopic(topicName); 
     topicSubscriber = topicSession.createDurableSubscriber(topicName, "subscriptionname", String.format("user = '%s'", userName), false); 
     topicConnection.start(); 
     Message msg = null; 

     do { 
     msg = topicSubscriber.receiveNoWait(); 
     if (msg instanceof ObjectMessage) { 
      ObjectMessage om = (ObjectMessage) msg; 
     else { 
      log.error(String.format(" %s", om.getObject().getClass().getSimpleName())); 
      } 
     } else if (msg != null) { 
      log.error(String.format("e %s", msg.getClass().getSimpleName())); 
     } 
     } while (msg != null && out.size() <= maxResults); 
    } catch (JMSException e) { 
     throw new RuntimeException("Error retrieving User Messages", e); 
    } finally { 
     closeConnections(topicSubscriber, null, topicSession, topicConnection); 
    } 
    return out; 
} 

回答

0

要調用receiveNoWait(),這將同時檢索一個消息。根據您的JMS提供程序,通常會發生的情況是JMS客戶端將一次檢索多條消息並將其緩存在客戶端以減少網絡延遲。當你調用接收時,它會從這個緩存中獲取消息並提供給你。

如果您看到分鐘很長的延遲,那麼在將這些消息放入主題時如何出錯或者在處理每封消息時阻止了您的消息接收。如果您不想在處理時考慮實現MessageListener接口而不是使用receive方法,或者您可以從receive方法獲取消息並異步地在線程池中處理它們,您不希望阻止收到消息。

當您創建消費者可以添加監聽器,如下:

MessageListener listener = new MyListener(); 
consumer.setMessageListener(listener); 

然後創建一個類來處理消息或實現該接口在現有的消費類:

public class MyListener implements MessageListener { 
    public void onMessage(Message message) 
    { 
    TextMessage text = (TextMessage) message; 

    System.out.println("Message: " + text.getText()); 
    } 
} 
+0

是,我想檢索訂戶的所有可用消息。這就是爲指定數量的消息循環的原因。它正在閱讀所有信息,但不是在發佈後立即發佈,需要5-6分鐘(有時甚至更多)。我觀察到一個奇怪的行爲,如果我在調試模式下運行我的jboss,並且如果我有一些斷點,它會立即讀取消息。但是,如果在正常模式下運行,則需要一些時間來閱讀。 – John 2011-12-20 22:44:29

+0

既然你提到過JBoss,我假設你在應用服務器內部使用JBoss Messaging作爲你的JMS提供者? – gregwhitaker 2011-12-20 22:49:45

+0

是的,你是對的。我正在使用JBOSS提供的JMS服務器。我是否缺少配置? – John 2011-12-20 22:57:50