2012-06-30 19 views
3

對此可能有一個愚蠢的簡單答案,但我試圖使用ActiveMQ在生產者和消費者之間傳遞消息。我會有很多製作人和很多消費者,但我希望每個消息在消費者中只傳遞一次。這似乎意味着我無法使用主題,因爲它們會將消息傳遞給正在監聽的所有消費者,並且我只希望一位消費者接收每條消息。無法讓ActiveMQ出列

我的問題是我能夠接收消息,但消息沒有出列。所以如果我重新啓動消費者流程,所有的消息都會被重新處理。 This answer似乎是相關的,但似乎不適用,因爲我不能創建持久化隊列使用者,只有持久主題使用者(除非我在API文檔中缺少某些內容)。

我的代碼如下。

TopicConnectionFactory factory = new ActiveMQConnectionFactory(props.getProperty("mq.url")); 
Connection conn = factory.createConnection(); 
Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
Queue queue = session.createQueue(props.getProperty("mq.source_queue")); 
conn.start(); 
MessageConsumer consumer = session.createConsumer(queue); 

再後來就

Message msg = consumer.receive(); 
msg.acknowledge(); 
if (!(msg instanceof TextMessage)) continue; 
String msgStr = ((TextMessage)msg).getText(); 

這是我當前的代碼。我嘗試過使用Session.AUTO_ACKNOWLEDGE而沒有msg.acknowledge()。此代碼的任何工作排列似乎都會檢索郵件,但是當我重新啓動消費者時,即使在重新啓動之前收到所有郵件,也會再次收到所有郵件。

回答

5

您將會話創建爲交易會話,因此如果要通知代理所有消息現在已消耗且不需要重新遞送,則需要調用session.commit。如果您沒有將createSession的第一個參數設置爲true,那麼Ack模式會受到尊重,否則將被忽略,這是JMS API的一個古怪的問題。如果你這樣做:

ConnectionFactory factory = new ActiveMQConnectionFactory(props.getProperty("mq.url")); 
Connection conn = factory.createConnection(); 
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
Queue queue = session.createQueue(props.getProperty("mq.source_queue")); 
conn.start(); 
MessageConsumer consumer = session.createConsumer(queue); 

那麼這會工作:

Message msg = consumer.receive(); 
msg.acknowledge(); 

否則,你需要做的:

Message msg = consumer.receive(); 
session.commit(); 

但是請記住,單條信息交易不真正有意義的客戶確認沒有交易是一個更好的選擇。

+0

將其更改爲conn.createSession(false,Session.CLIENT_ACKNOWLEDGE);工作。謝謝。我認爲這很簡單,我錯過了。 –