2011-03-01 56 views
3

我使用ActiveMQ作爲JMS代理和客戶,jmsTemplate發送消息,1個非持久主題。在高峯時間,我有〜100信息/秒。ActiveMQ上的重複郵件

隊列中有多少條消息並不重要,但我經常會得到重複的消息。我想到的臨時解決方案是在表上設置索引 - 目前所有的消息都只保存在數據庫中。

我的第一個問題 - 爲什麼消息是重複的,如果我指定非持久主題和響應不是必需的?

發件人:

@Component 
public class QueueSender 
{ 
    private Logger log = Logger.getLogger(getClass()); 
@Autowired 
    protected JmsTemplate jmsTemplate; 


    public JmsTemplate getJmsTemplate() { 
     return jmsTemplate; 
    } 

    public void setJmsTemplate(JmsTemplate jmsTemplate) { 
     this.jmsTemplate = jmsTemplate; 
    } 

    @Autowired 
    public QueueSender(final JmsTemplate jmsTemplate) 
    { 
     this.jmsTemplate = jmsTemplate; 
     this.jmsTemplate.setDeliveryPersistent(false); 
     System.out.println("isSessionTransacted "+jmsTemplate.isSessionTransacted()+ 
       " getDeliveryMode "+jmsTemplate.getDeliveryMode()+ 
       " getReceiveTimeout "+jmsTemplate.getReceiveTimeout()+ 
       " getSessionAcknowledgeMode "+jmsTemplate.getSessionAcknowledgeMode()); 
    } 


    public void sendPrice(Integer tickerId, Integer field, Double price, Long timestamp) 
    { 
     jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
     jmsTemplate.setMessageIdEnabled(true); 
     Map <String, Object>map = new HashMap<String, Object>(); 
     map.put("tickerId", tickerId); 
     map.put("field", field); 
     map.put("price", price); 
     map.put("timestamp", timestamp); 
     jmsTemplate.convertAndSend("Quotez", map); 
    } 

    public void sendVolume(Integer tickerId, Integer field, Integer size, Long timestamp) 
    { 
     jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 

     Map <String, Object>map = new HashMap<String, Object>(); 
     map.put("tickerId", tickerId); 
     map.put("field", field); 
     map.put("size", size); 
     map.put("timestamp", timestamp); 
     jmsTemplate.convertAndSend("Quotez", map); 

    } 

} 

監聽器:

public void onMessage(Message message) 
{ 
    if (message instanceof MapMessage) 
    {   
     try 
     { 
      MapMessage mapMessage = (MapMessage) message; 
       if(null != mapMessage.getString("price")) 
       { 
priceService.insert(mapMessage.getInt("tickerId"),mapMessage.getDouble("price"), 
mapMessage.getInt("field"),mapMessage.getLong("timestamp")); 
       }      else{ 
volumeService.insert(mapMessage.getInt("tickerId"),mapMessage.getInt("size"), 
mapMessage.getInt("field"),mapMessage.getLong("timestamp")); 
      } 
     } 
     catch (final JMSException e) 
     { 
      exceptionListener.onException(e); 
     } 
    } 
} 

春:

<amq:broker useJmx="true" persistent="false"> 
<amq:transportConnectors> 
    <amq:transportConnector uri="tcp://localhost:0"/> 
</amq:transportConnectors> </amq:broker> 
<amq:topic id="topicDest" physicalName="Quotez"/> 
    <amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost?jms.watchTopicAdvisories=false"/> 
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 
<constructor-arg ref="jmsFactory" /> 
<property name="exceptionListener" ref="jmsExceptionListener" /> 
<property name="sessionCacheSize" value="100" /> 
</bean> 


<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <constructor-arg ref="connectionFactory"/> 
    <property name="pubSubDomain" value="true"/> 
<property name="defaultDestinationName" value="Quotez"/>  
</bean> 
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
     <property name="connectionFactory" ref="connectionFactory"/> 
     <property name="destination" ref="topicDest"/> 
     <property name="messageListener" ref="jdbcListener" /> 
    </bean> 

第二個問題是關於jmsContainer配置。上面的代碼和下面的代碼有什麼區別?上面的代碼給了我主題作爲訂戶和下面的代碼給我隊列。

<jms:listener-container concurrency="10" connection-factory="connectionFactory">  
<jms:listener id="JdbcListener" destination="topicDest" ref="queueListener" /> 
</jms:listener-container> 

我發現,駱駝和其idempotentConsumer想解決重複的問題 - 當然,這將是很好知道爲什麼它排在首位發生。第三個問題涉及駱駝的配置。我有這個配置(默認):

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
<property name="brokerURL" value="tcp://localhost:0"/> 
</bean> 

<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/> 

<camelContext xmlns="http://camel.apache.org/schema/spring"> 
<route> 
    <from uri="direct:start"/> 
    <idempotentConsumer messageIdRepositoryRef="myRepo"> 
     <header>messageId</header> 
     <to uri="mock:result"/> 
    </idempotentConsumer> 
</route> 
</camelContext> 

它適用於所有隊列還是應該明確訂閱?我想它會檢查每個主題/隊列和所有傳入的消息。目前的問題是,所有消息都有messageId = null,並且過濾器將其作爲參數。

2011-03-01 11:24:09,152 DEBUG (org.springframework.jms.core.JmsTemplate:567) - Sending created message: ActiveMQMapMessage {commandId = 0, responseRequired = false, **messageId = null**, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {field=1, timestamp=1298975049138, price=72.89, tickerId=2} } 

我沒有找到設置messageId的簡單方法。我的問題 - 是否足以設置messageId,它將作爲例外或配置出現問題,例如我必須指定將使用哪個主題。

感謝,

Dzidas

+0

我建議去問問ActiveMQ用戶郵件列表,並檢查現有的線程。你可能會在那裏找到一些幫助和答案。 http://activemq.apache.org/mailing-lists.html – 2011-03-10 14:41:12

回答

4

使用JMS主題時,你需要設置併發/最大併發用戶爲「1」或者你會得到重複。如果您需要多線程消耗和/或負載平衡,請改爲使用virtual topics