2013-07-30 43 views
4

我試圖使用Spring JMSTemplate.receive(String)方法以同步模式從隊列中獲取所有消息。Spring JMSTemplate在一次事務中接收所有消息

問題是我總是隻有一條消息。下面是代碼:

@Transactional 
public List<Message> receiveAllFromQueue(String destination) { 
    List<Message> messages = new ArrayList<Message>(); 
    Message message; 
    while ((message = queueJmsTemplate.receive(destination)) != null) { 
    messages.add(message); 
    } 
    return messages; 
} 

如果我刪除@Transactional註解我得到的所有消息,但一切都做得出來的交易,所以如果以後處理這些消息中有一個例外的消息將丟失。

這裏是我的JMSTemplate bean的定義。

<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="connectionFactory" /> 
    <property name="pubSubDomain" value="false" /> 
    <property name="receiveTimeout" value="1" /> 
    <property name="sessionTransacted" value="true" /> 
</bean> 

我想要實現的是有一個事務,並且在這個事務中我想獲得所有待處理的消息。

回答

3

我會回覆自己。它看起來像JMSTemplate不支持它。暫時解決它的唯一方法是擴展JMSTemplate並添加使用JMSTemplate部分的新方法。不幸的是有些方法是私人的,所以他們需要複製...

public class CustomQueueJmsTemplate extends JmsTemplateDelegate { 

    public List<Message> receiveAll(String destinationName) { 
    return receiveAll(destinationName, null); 
    } 

    public List<Message> receiveAll(final String destinationName, final String messageSelector) { 
    return execute(new SessionCallback<List<Message>>() { 
     @Override 
     public List<Message> doInJms(Session session) throws JMSException { 
     Destination destination = resolveDestinationName(session, destinationName); 
     return doReceiveAll(session, destination, messageSelector); 
     } 
    }, true); 
    } 

    private List<Message> doReceiveAll(Session session, Destination destination, String messageSelector) 
     throws JMSException 
    { 
    return doReceiveAll(session, createConsumer(session, destination, messageSelector)); 
    } 

    private List<Message> doReceiveAll(Session session, MessageConsumer consumer) throws JMSException { 
    try { 
     // Use transaction timeout (if available). 
     long timeout = getReceiveTimeout(); 
     JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager 
      .getResource(getConnectionFactory()); 
     if (resourceHolder != null && resourceHolder.hasTimeout()) { 
     timeout = resourceHolder.getTimeToLiveInMillis(); 
     } 

     // START OF MODIFIED CODE 
     List<Message> messages = new ArrayList<>(); 
     Message message; 
     while ((message = doReceive(consumer, timeout)) != null) { 
     messages.add(message); 
     } 
     // END OF MODIFIED CODE 

     if (session.getTransacted()) { 
     // Commit necessary - but avoid commit call within a JTA transaction. 
     if (isSessionLocallyTransacted(session)) { 
      // Transacted session created by this template -> commit. 
      JmsUtils.commitIfNecessary(session); 
     } 
     } else if (isClientAcknowledge(session)) { 
     // Manually acknowledge message, if any. 
     for (Message retrievedMessages : messages) { 
      retrievedMessages.acknowledge(); 
     } 
     } 
     return messages; 
    } 
    finally { 
     JmsUtils.closeMessageConsumer(consumer); 
    } 
    } 

    private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException { 
    if (timeout == RECEIVE_TIMEOUT_NO_WAIT) { 
     return consumer.receiveNoWait(); 
    } else if (timeout > 0) { 
     return consumer.receive(timeout); 
    } else { 
     return consumer.receive(); 
    } 
    } 

}