1
A
回答
2
使用DefaultMessageListenerContainer。您可以向其註冊偵聽器並異步使用消息。點擊此鏈接獲取有關調整MessageListenerContainer的更多信息:http://bsnyderblog.blogspot.se/2010/05/tuning-jms-message-consumption-in.html。
HornetQ的依賴條件,你需要(我用了一個獨立的文件hornetq-2.3.0.CR2)(你還需要一些春瓶):
<dependencies>
<!-- hornetq -->
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.7.Final</version>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jms-client</artifactId>
<version>2.3.0.CR2</version>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-core-client</artifactId>
<version>2.3.0.CR2</version>
</dependency>
<!-- hornetq -->
</dependencies>
的豆你應該在你的applicationContext.xml使用(我沒「T使用JNDI爲獲得ConnectionFactory和目的地;對於這一點,你可以按照this question):
<!-- It's ConnectionFactory to connect to hornetq. 5445 is hornetq acceptor port -->
<bean name="connectionFactory" class="messaging.jms.CustomHornetQJMSConnectionFactory">
<constructor-arg index="0" name="ha" value="false" />
<constructor-arg index="1" name="commaSepratedServerUrls" value="127.0.0.1:5445" />
</bean>
<bean id="destinationParent" class="messaging.jms.JmsDestinationFactoryBean" abstract="true">
<property name="pubSubDomain" value="false" /> <!-- default is queue -->
</bean>
<bean id="exampleDestination" parent="destinationParent">
<property name="destinationName" value="example" /> <!-- queue name -->
</bean>
<!-- MessageListener -->
<bean id="messageHandler" class="messaging.consumer.MessageHandler">
</bean>
<!-- MessageListenerContainer -->
<bean id="paymentListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="destination" ref="exampleDestination" />
<property name="messageListener" ref="messageHandler" />
<property name="connectionFactory" ref="connectionFactory" />
<property name="sessionTransacted" value="true" />
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="10" />
<property name="idleConsumerLimit" value="2" />
<property name="idleTaskExecutionLimit" value="5" />
<property name="receiveTimeout" value="3000" />
</bean>
CustomHornetQJMSConnectionFactory:
public class CustomHornetQJMSConnectionFactory extends org.hornetq.jms.client.HornetQJMSConnectionFactory
{
private static final long serialVersionUID = 1L;
public CustomHornetQJMSConnectionFactory(boolean ha, String commaSepratedServerUrls)
{
super(ha, converToTransportConfigurations(commaSepratedServerUrls));
}
public static TransportConfiguration[] converToTransportConfigurations(String commaSepratedServerUrls)
{
String [] serverUrls = commaSepratedServerUrls.split(",");
TransportConfiguration[] transportconfigurations = new TransportConfiguration[serverUrls.length];
for(int i = 0; i < serverUrls.length; i++)
{
String[] urlParts = serverUrls[i].split(":");
HashMap<String, Object> map = new HashMap<String,Object>();
map.put(TransportConstants.HOST_PROP_NAME, urlParts[0]);
map.put(TransportConstants.PORT_PROP_NAME, urlParts[1]);
transportconfigurations[i] = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
}
return transportconfigurations;
}
}
JmsDestinationFactoryBean(在destinationParent使用Bean):
public class JmsDestinationFactoryBean implements FactoryBean<Destination>
{
private String destinationName;
private boolean pubSubDomain = false;
public void setDestinationName(String destinationName) {
this.destinationName = destinationName;
}
public void setPubSubDomain(boolean pubSubDomain) {
this.pubSubDomain = pubSubDomain;
}
@Override
public Class<?> getObjectType()
{
return Destination.class;
}
@Override
public boolean isSingleton()
{
return true;
}
@Override
public Destination getObject() throws Exception
{
if(pubSubDomain)
{
return HornetQJMSClient.createTopic(destinationName);
}
else
{
return HornetQJMSClient.createQueue(destinationName);
}
}
}
的MessageHandler(接收到的信息去onMessage方法的過程)(爲簡單起見,你可以實現一個javax.jms.MessageListener,而不是從SessionAwareMessageListener):
public class MessageHandler implements org.springframework.jms.listener.SessionAwareMessageListener<Message>
{
@Override
public void onMessage(Message msg, Session session) throws JMSException
{
if(msg instanceof TextMessage)
{
System.out.println(((TextMessage)msg).getText());
session.commit();
}
else
{
session.rollback(); // send message back to the queue
}
}
相關問題
- 1. 來自magento的消費者密鑰和消費者密鑰
- 2. 對多個消費者的RPC調用
- 3. 在ActiveMQ中自動刪除不活動的耐用消費者
- 4. Kafka Listener方法未被調用。消費者不消費。
- 5. 來自消費者羣體的所有消費者都會收到消息
- 6. Spring JMS消費者拉動
- 7. odata4j消費者函數調用
- 8. 如何在使用Semphores的生產者 - 消費者中消費?
- 9. 消費者生產者多線程消費者不會消逝
- 10. 自動發送消息給調用者
- 11. 消費者過濾的生產者 - 消費者阻塞隊列
- 12. 生產者/消費者線程中的油門消費者
- 13. Java生產者 - 消費者:生產者不「通知()」消費者
- 14. RabbitMQ消費者
- 15. 消費者池
- 16. ServiceLocator的消費者是否應該調用ServiceLocator.Current?
- 17. RabbitMQ自動消費
- 18. 消費消費使用卡夫卡消費者 - Java
- 19. 消費者在動態創建時不消費消息
- 20. 來自ActiveMQ中消費者的致謝
- 21. 來自ActiveMQ中消費者的致謝
- 22. 來自kafka消費者的InstanceAlreadyExistsException
- 23. 使用新消費者API刪除kafka消費者偏移量
- 24. 當消費者有消息時,RabbitMQ是否會調用回調函數?
- 25. 生產者/消費者
- 26. 消費者和提供者
- 27. 生產者消費者
- 28. POSIX生產者 - 消費者
- 29. Clojure生產者消費者
- 30. LinkedBlockingQueue - 生產者/消費者