2017-10-05 68 views
2

如何在使用Spring Boot時更改嵌入式ActiveMQ的交付策略?我試着在DefaultJmsListenerContainerFactory上指定FixedBackOff,但它沒有幫助。下面是我用來初始化jms工廠bean的代碼。我有一個消息使用者正在處理隊列中的傳入消息。在處理期間由於不可用的資源,我拋出一個檢查異常。我希望在固定的時間間隔後重新傳遞消息進行處理。更改Sprint啓動中嵌入式ActiveMQ的活動MQ RedeliveryPolicy

春季啓動:1.5.7.Release

的Java:1.7

@Bean 
public JmsListenerContainerFactory<?> publishFactory(ConnectionFactory connectionFactory, 
               DefaultJmsListenerContainerFactoryConfigurer configurer) { 
    DefaultJmsListenerContainerFactory factory = 
     new DefaultJmsListenerContainerFactory(); 

    factory.setBackOff(new FixedBackOff(5000, 5)); 

    // This provides all boot's default to this factory, including the message converter 
    configurer.configure(factory, connectionFactory); 

    // You could still override some of Boot's default if necessary. 
    factory.setErrorHandler(new ErrorHandler() { 

     @Override 
     public void handleError(Throwable t) { 
      LOG.error("Error occured in JMS transaction.", t); 
     } 

    }); 
    return factory; 
} 

消費者代碼:

@JmsListener(destination = "PublishQueue", containerFactory = "publishFactory") 
@Transactional 
public void receiveMessage(PublishData publishData) { 
    LOG.info("Processing incoming message on publish queue with transaction id: " + publishData.getTransactionId()); 

    PublishUser user = new PublishUser(); 
    user.setPriority(1); 
    user.setUserId(publishData.getUserId()); 

    LOG.trace("Trying to enroll in the publish lock queue for user: " + user); 
    PublishLockQueue lockQueue = publishLockQueueService.createLock(user); 
    if (lockQueue == null) 
     throw new RuntimeException("Unable to create lock for publish"); 
    LOG.trace("Publish lock queue obtained with lock queue id: " + lockQueue.getId()); 

    try { 
     publishLockQueueService.acquireLock(lockQueue.getId()); 
     LOG.trace("Acquired publish lock."); 
    } 
    catch (PublishLockQueueServiceException pex) { 
     throw new RuntimeException(pex); 
    } 

    try { 
     publishService.publish(publishData, lockQueue.getId()); 
     LOG.trace("Completed publish of changes."); 

     sendPublishSuccessNotification(publishData); 
    } 
    finally { 
     LOG.trace("Trying to release lock to publish."); 
     publishLockQueueService.releaseLock(lockQueue.getId()); 
    } 

    LOG.info("Publish has been completed for transaction id: " + publishData.getTransactionId()); 
} 
+0

其消費者,您需要使用事務確認模式讓消費者回滾異常,並讓ActiveMQ能夠將消息重新傳遞給同一個消費者或另一個消費者,前提是您有多個消費者正在運行。但是,您可以在ActiveMQ上配置重新傳遞選項,如退避等。上面的錯誤處理程序只是一個noop監聽程序,除了日誌記錄之外,它們無法完成。 –

+1

你能分享你的代碼嗎? – Makoton

+1

@Makoton我也包含了消費者代碼。 – user320587

回答

0

@claus answerd:我測試了它的工作:

它的消費者,你需要使用事務確認模式讓消費者回滾異常,並讓ActiveMQ能夠將消息重新傳遞給同一個消費者或另一個消費者(如果有多個消費者正在運行)。但是,您可以在ActiveMQ上配置重新傳送選項,例如退避等。上述錯誤處理程序只是一個noop監聽程序,除了日誌記錄以外不能做很多其他操作

+0

正如我在評論中提到的,我的問題不在於重新交付,而在於重新交付的時間間隔。我知道我可以直接使用ActiveMQ連接工廠對象來指定退避。這意味着我需要包含特定於ActiveMQ的代碼。我希望不這樣做,只需使用Spring DefaultJmsListenerContainerFactory#setBackOff方法指定退避策略 – user320587