2013-04-11 26 views

回答

2

使用DefaultMessageListenerContainer。您可以向其註冊偵聽器並異步使用消息。點擊此鏈接獲取有關調整MessageListenerContainer的更多信息:http://bsnyderblog.blogspot.se/2010/05/tuning-jms-message-consumption-in.html

HornetQ的依賴條件,你需要(我用了一個獨立的文件hornetq-2.3.0.CR2)(你還需要一些春瓶):

<dependencies> 
    <!-- hornetq --> 
    <dependency> 
     <groupId>org.jboss.netty</groupId> 
     <artifactId>netty</artifactId> 
     <version>3.2.7.Final</version> 
    </dependency> 
    <dependency> 
     <groupId>org.hornetq</groupId> 
     <artifactId>hornetq-jms-client</artifactId> 
     <version>2.3.0.CR2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.hornetq</groupId> 
     <artifactId>hornetq-core-client</artifactId> 
     <version>2.3.0.CR2</version> 
    </dependency> 
    <!-- hornetq --> 
</dependencies> 

的豆你應該在你的applicationContext.xml使用(我沒「T使用JNDI爲獲得ConnectionFactory和目的地;對於這一點,你可以按照this question):

<!-- It's ConnectionFactory to connect to hornetq. 5445 is hornetq acceptor port --> 
<bean name="connectionFactory" class="messaging.jms.CustomHornetQJMSConnectionFactory"> 
    <constructor-arg index="0" name="ha" value="false" /> 
    <constructor-arg index="1" name="commaSepratedServerUrls" value="127.0.0.1:5445" /> 
</bean> 

<bean id="destinationParent" class="messaging.jms.JmsDestinationFactoryBean" abstract="true"> 
    <property name="pubSubDomain" value="false" /> <!-- default is queue --> 
</bean> 

<bean id="exampleDestination" parent="destinationParent"> 
    <property name="destinationName" value="example" /> <!-- queue name --> 
</bean> 

<!-- MessageListener --> 
<bean id="messageHandler" class="messaging.consumer.MessageHandler"> 
</bean> 

<!-- MessageListenerContainer --> 
    <bean id="paymentListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
     <property name="destination"  ref="exampleDestination" /> 
     <property name="messageListener" ref="messageHandler" /> 
     <property name="connectionFactory" ref="connectionFactory" /> 
     <property name="sessionTransacted" value="true" /> 
     <property name="concurrentConsumers" value="1" /> 
     <property name="maxConcurrentConsumers" value="10" /> 
     <property name="idleConsumerLimit"  value="2" /> 
     <property name="idleTaskExecutionLimit" value="5" /> 
     <property name="receiveTimeout"   value="3000" /> 
    </bean> 

CustomHornetQJMSConnectionFactory:

public class CustomHornetQJMSConnectionFactory extends org.hornetq.jms.client.HornetQJMSConnectionFactory 
{ 
    private static final long serialVersionUID = 1L; 

    public CustomHornetQJMSConnectionFactory(boolean ha, String commaSepratedServerUrls) 
    { 
     super(ha, converToTransportConfigurations(commaSepratedServerUrls)); 
    } 

    public static TransportConfiguration[] converToTransportConfigurations(String commaSepratedServerUrls) 
    { 
     String [] serverUrls = commaSepratedServerUrls.split(","); 
     TransportConfiguration[] transportconfigurations = new TransportConfiguration[serverUrls.length]; 
     for(int i = 0; i < serverUrls.length; i++) 
     { 
      String[] urlParts = serverUrls[i].split(":"); 
      HashMap<String, Object> map = new HashMap<String,Object>(); 
      map.put(TransportConstants.HOST_PROP_NAME, urlParts[0]); 
      map.put(TransportConstants.PORT_PROP_NAME, urlParts[1]); 
      transportconfigurations[i] = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 
     } 
     return transportconfigurations; 
    } 
} 

JmsDestinationFactoryBean(在destinationParent使用Bean):

public class JmsDestinationFactoryBean implements FactoryBean<Destination> 
{ 
    private String destinationName; 
    private boolean pubSubDomain = false; 

    public void setDestinationName(String destinationName) { 
     this.destinationName = destinationName; 
    } 

    public void setPubSubDomain(boolean pubSubDomain) { 
     this.pubSubDomain = pubSubDomain; 
    } 

    @Override 
    public Class<?> getObjectType() 
    { 
     return Destination.class; 
    } 

    @Override 
    public boolean isSingleton() 
    { 
     return true; 
    } 

    @Override 
    public Destination getObject() throws Exception 
    { 
     if(pubSubDomain) 
     { 
      return HornetQJMSClient.createTopic(destinationName); 
     } 
     else 
     {   
      return HornetQJMSClient.createQueue(destinationName);   
     } 
    } 
} 

的MessageHandler(接收到的信息去onMessage方法的過程)(爲簡單起見,你可以實現一個javax.jms.MessageListener,而不是從SessionAwareMessageListener):

public class MessageHandler implements org.springframework.jms.listener.SessionAwareMessageListener<Message> 
{ 
@Override 
public void onMessage(Message msg, Session session) throws JMSException 
{ 
    if(msg instanceof TextMessage) 
    { 
     System.out.println(((TextMessage)msg).getText()); 
     session.commit(); 
    } 
    else 
    { 
     session.rollback(); // send message back to the queue 
    } 
}