我使用ActiveMQ作爲JMS代理和客戶,jmsTemplate發送消息,1個非持久主題。在高峯時間,我有〜100信息/秒。ActiveMQ上的重複郵件
隊列中有多少條消息並不重要,但我經常會得到重複的消息。我想到的臨時解決方案是在表上設置索引 - 目前所有的消息都只保存在數據庫中。
我的第一個問題 - 爲什麼消息是重複的,如果我指定非持久主題和響應不是必需的?
發件人:
@Component
public class QueueSender
{
private Logger log = Logger.getLogger(getClass());
@Autowired
protected JmsTemplate jmsTemplate;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@Autowired
public QueueSender(final JmsTemplate jmsTemplate)
{
this.jmsTemplate = jmsTemplate;
this.jmsTemplate.setDeliveryPersistent(false);
System.out.println("isSessionTransacted "+jmsTemplate.isSessionTransacted()+
" getDeliveryMode "+jmsTemplate.getDeliveryMode()+
" getReceiveTimeout "+jmsTemplate.getReceiveTimeout()+
" getSessionAcknowledgeMode "+jmsTemplate.getSessionAcknowledgeMode());
}
public void sendPrice(Integer tickerId, Integer field, Double price, Long timestamp)
{
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
jmsTemplate.setMessageIdEnabled(true);
Map <String, Object>map = new HashMap<String, Object>();
map.put("tickerId", tickerId);
map.put("field", field);
map.put("price", price);
map.put("timestamp", timestamp);
jmsTemplate.convertAndSend("Quotez", map);
}
public void sendVolume(Integer tickerId, Integer field, Integer size, Long timestamp)
{
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Map <String, Object>map = new HashMap<String, Object>();
map.put("tickerId", tickerId);
map.put("field", field);
map.put("size", size);
map.put("timestamp", timestamp);
jmsTemplate.convertAndSend("Quotez", map);
}
}
監聽器:
public void onMessage(Message message)
{
if (message instanceof MapMessage)
{
try
{
MapMessage mapMessage = (MapMessage) message;
if(null != mapMessage.getString("price"))
{
priceService.insert(mapMessage.getInt("tickerId"),mapMessage.getDouble("price"),
mapMessage.getInt("field"),mapMessage.getLong("timestamp"));
} else{
volumeService.insert(mapMessage.getInt("tickerId"),mapMessage.getInt("size"),
mapMessage.getInt("field"),mapMessage.getLong("timestamp"));
}
}
catch (final JMSException e)
{
exceptionListener.onException(e);
}
}
}
春:
<amq:broker useJmx="true" persistent="false">
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0"/>
</amq:transportConnectors> </amq:broker>
<amq:topic id="topicDest" physicalName="Quotez"/>
<amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost?jms.watchTopicAdvisories=false"/>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="jmsFactory" />
<property name="exceptionListener" ref="jmsExceptionListener" />
<property name="sessionCacheSize" value="100" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"/>
<property name="pubSubDomain" value="true"/>
<property name="defaultDestinationName" value="Quotez"/>
</bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="topicDest"/>
<property name="messageListener" ref="jdbcListener" />
</bean>
第二個問題是關於jmsContainer配置。上面的代碼和下面的代碼有什麼區別?上面的代碼給了我主題作爲訂戶和下面的代碼給我隊列。
<jms:listener-container concurrency="10" connection-factory="connectionFactory">
<jms:listener id="JdbcListener" destination="topicDest" ref="queueListener" />
</jms:listener-container>
我發現,駱駝和其idempotentConsumer想解決重複的問題 - 當然,這將是很好知道爲什麼它排在首位發生。第三個問題涉及駱駝的配置。我有這個配置(默認):
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="tcp://localhost:0"/>
</bean>
<bean id="myRepo" class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<idempotentConsumer messageIdRepositoryRef="myRepo">
<header>messageId</header>
<to uri="mock:result"/>
</idempotentConsumer>
</route>
</camelContext>
它適用於所有隊列還是應該明確訂閱?我想它會檢查每個主題/隊列和所有傳入的消息。目前的問題是,所有消息都有messageId = null,並且過濾器將其作爲參數。
2011-03-01 11:24:09,152 DEBUG (org.springframework.jms.core.JmsTemplate:567) - Sending created message: ActiveMQMapMessage {commandId = 0, responseRequired = false, **messageId = null**, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {field=1, timestamp=1298975049138, price=72.89, tickerId=2} }
我沒有找到設置messageId的簡單方法。我的問題 - 是否足以設置messageId,它將作爲例外或配置出現問題,例如我必須指定將使用哪個主題。
感謝,
Dzidas
我建議去問問ActiveMQ用戶郵件列表,並檢查現有的線程。你可能會在那裏找到一些幫助和答案。 http://activemq.apache.org/mailing-lists.html – 2011-03-10 14:41:12