1
我正在嘗試在我們的Spring應用程序中集成Rabbit MQ代理。我能夠成功地使用消息,但需要添加錯誤處理。 監聽器使用該消息並向其應用業務邏輯,其中包括數據庫寫入。業務邏輯可以拋出異常。春季錯誤處理RabbitMQ
在這些例外情況,我需要
- 回滾數據庫寫入的情況。
- 寫入Db中的錯誤表,指示msg失敗。
- 消息不應重新排隊。
對於
要求#1 - 已經在3210加入
txManager
和註釋的Listner.listen()
方法與@Transactional
要求#2 - 添加錯誤處理程序和定製實施
DefaultExceptionStrategey
需求#3 - 已設置
DefaultRequeueRejected=false
但當BusinessRuntimeException
從監聽器拋出的ErrorHandler是沒有得到調用。 不知道缺少什麼。 errorHandler
僅對某些異常被調用?
的Config.xml
<tx:annotation-driven transaction-manager="txManager" />
<bean id="txManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="allowCustomIsolationLevels" value="true" />
<rabbit:connection-factory id="rabbitConnectionFactory"/>
<rabbit:template id="rabbitTemplate" connection-
factory="rabbitConnectionFactory" message-converter="jsonMessageConverter"
channel-transacted="true"/>
<rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/>
RabbitMQConfiguration.java
@Configuration
@EnableRabbit
public class RabbitMqConfiguration {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Autowired
private MessageConverter jsonMessageConverter;
@Bean
public SimpleRabbitListenerContainerFactory exportPartyListenerContainer() {
SimpleRabbitListenerContainerFactory listenerContainer = new SimpleRabbitListenerContainerFactory();
listenerContainer.setConnectionFactory(rabbitConnectionFactory);
listenerContainer.setMessageConverter(jsonMessageConverter);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
listenerContainer.setChannelTransacted(true);
listenerContainer.setDefaultRequeueRejected(false);
listenerContainer.setErrorHandler(errorHandler());
return listenerContainer;
}
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new ExceptionStrategy());
} }
ExceptionStrategy.java
public class ExceptionStrategy extends DefaultExceptionStrategy {
@Autowired
private Dao daoBean;
@Override
public boolean isFatal(Throwable t) {
if (t instanceof BusinessRuntimeException) {
BusinessRuntimeException businessException = (BusinessRuntimeException) t;
//db call
daoBean.updateRecordStaus();
return true;
}
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
logger.error(
"Failed to process inbound message from queue " + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(),
t);
}
return super.isFatal(t);
}}