如何在使用Spring Boot時更改嵌入式ActiveMQ的交付策略?我試着在DefaultJmsListenerContainerFactory上指定FixedBackOff,但它沒有幫助。下面是我用來初始化jms工廠bean的代碼。我有一個消息使用者正在處理隊列中的傳入消息。在處理期間由於不可用的資源,我拋出一個檢查異常。我希望在固定的時間間隔後重新傳遞消息進行處理。更改Sprint啓動中嵌入式ActiveMQ的活動MQ RedeliveryPolicy
春季啓動:1.5.7.Release
的Java:1.7
@Bean
public JmsListenerContainerFactory<?> publishFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setBackOff(new FixedBackOff(5000, 5));
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
LOG.error("Error occured in JMS transaction.", t);
}
});
return factory;
}
消費者代碼:
@JmsListener(destination = "PublishQueue", containerFactory = "publishFactory")
@Transactional
public void receiveMessage(PublishData publishData) {
LOG.info("Processing incoming message on publish queue with transaction id: " + publishData.getTransactionId());
PublishUser user = new PublishUser();
user.setPriority(1);
user.setUserId(publishData.getUserId());
LOG.trace("Trying to enroll in the publish lock queue for user: " + user);
PublishLockQueue lockQueue = publishLockQueueService.createLock(user);
if (lockQueue == null)
throw new RuntimeException("Unable to create lock for publish");
LOG.trace("Publish lock queue obtained with lock queue id: " + lockQueue.getId());
try {
publishLockQueueService.acquireLock(lockQueue.getId());
LOG.trace("Acquired publish lock.");
}
catch (PublishLockQueueServiceException pex) {
throw new RuntimeException(pex);
}
try {
publishService.publish(publishData, lockQueue.getId());
LOG.trace("Completed publish of changes.");
sendPublishSuccessNotification(publishData);
}
finally {
LOG.trace("Trying to release lock to publish.");
publishLockQueueService.releaseLock(lockQueue.getId());
}
LOG.info("Publish has been completed for transaction id: " + publishData.getTransactionId());
}
其消費者,您需要使用事務確認模式讓消費者回滾異常,並讓ActiveMQ能夠將消息重新傳遞給同一個消費者或另一個消費者,前提是您有多個消費者正在運行。但是,您可以在ActiveMQ上配置重新傳遞選項,如退避等。上面的錯誤處理程序只是一個noop監聽程序,除了日誌記錄之外,它們無法完成。 –
你能分享你的代碼嗎? – Makoton
@Makoton我也包含了消費者代碼。 – user320587