2011-02-08 53 views
17

有一種方法可以抑制在ActiveMQ服務器上定義的隊列上的重複消息?在JMS/ActiveMQ上避免重複的消息

我嘗試手動定義JMSMessageID(message.setJMSMessageID(「uniqueid」)),但服務器忽略此修改並傳遞帶有內置生成的JMSMessageID的消息。

根據規範,我沒有找到關於如何刪除重複消息的參考。

在HornetQ中,爲了處理這個問題,我們需要在消息定義上聲明HQ特有的屬性org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID。

即:

Message jmsMessage = session.createMessage(); 
String myUniqueID = "This is my unique id"; // Could use a UUID for this 
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID); 

有人知道是否有對應的ActiveMQ類似的解決方案?

回答

7

你應該看看Apache的駱駝,它提供了一個冪等消費組件將與任何JMS提供者的工作,請參閱:http://camel.apache.org/idempotent-consumer.html

使用與ActiveMQ的組件使得使用JMS非常簡單組合,請參閱: http://camel.apache.org/activemq.html

+1

我懷疑這種方法是否能解決我的問題。我只需要在隊列中使用相同的JMSMessageID保留一個消息實例。我需要它作爲一個集合工作。我希望能夠將最新的idem元素從隊列中移除後,使用相同的JMSMessageID放置其他消息。我需要實施它並進行測試。但是,基於在EAI書中描述的Idempotent,我認爲這個概念與我的必要性不符。 BUt,建議的解決方案很好。我會更多地研究它並在這裏評論我的結果。謝謝 – apast 2011-02-11 01:31:58

4

我懷疑ActiveMQ本身是否支持它,但它應該很容易實現冪等消費者。做到這一點的一種方法是在生產者端爲每個消息添加一個唯一的標識符,現在在消費者端使用一個存儲區(db,cache等),可以檢查消息是否在之前被接收,以及繼續根據該檢查進行處理。

我沿同一行看到一個前面的stackoverflow問題 - Apache ActiveMQ 5.3 - How to configure a queue to reject duplicate messages?,這可能也有幫助。

3

有一種方法可以使ActiveMQ根據JMS屬性過濾重複項。它涉及編寫一個Activemq Plugin。將重複消息發送到死信隊列的基本代理過濾器就是這樣的

import java.util.List; 
import java.util.regex.Matcher; 
import java.util.regex.Pattern; 

import org.apache.activemq.broker.Broker; 
import org.apache.activemq.command.Message; 
import org.apache.activemq.command.ActiveMQMessage; 
import org.apache.activemq.broker.BrokerFilter; 
import org.apache.activemq.broker.ConnectionContext; 
import org.apache.activemq.command.ConnectionInfo; 
import org.apache.activemq.broker.ProducerBrokerExchange; 

public class DuplicateFilterBroker extends BrokerFilter { 
    String messagePropertyName; 
    boolean switchValue; 

    public DuplicateFilterBroker(Broker next, String messagePropertyName) { 
     super(next); 
     this.messagePropertyName = messagePropertyName; 
    } 

    public boolean hasDuplicate(String propertyValue){ 
     switchValue = propertyValue; 
     return switchValue; 
    } 

    public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception { 
     ActiveMQMessage amqmsg = (ActiveMQMessage)msg; 
     Object msgObj = msg.getMessage(); 
     if (msgObj instanceof javax.jms.Message) { 
      javax.jms.Message jmsMsg = (javax.jms.Message) msgObj; 
      if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName))) { 
       super.send(producerExchange, msg); 
      } 
      else { 
       sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg); 
      } 
     } 
    } 
} 
+0

絕對是一個非常好的方法來解決這個問題。 – user1052080 2013-01-11 14:05:28

3

現在支持刪除烘焙到ActiveMQ傳輸中的重複消息。請參閱Connection Configuration Guide中的配置值auditDepthauditMaximumProducerNumber

+3

您如何配置這些參數以避免重複? – Thomas 2013-08-26 09:44:20

0

看起來問題中提出的方式,適用於ActiveMQ(2016/12)。請參閱activemq-artemis指南。這要求生產者在消息中設置一個特定的屬性。

Message jmsMessage = session.createMessage(); 
String myUniqueID = "This is my unique id"; // Could use a UUID for this 
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID); 

但是包含屬性的類是不同的: org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID和屬性值是_AMQ_DUPL_ID