2017-07-26 141 views
1

我正在嘗試在我們的Spring應用程序中集成Rabbit MQ代理。我能夠成功地使用消息,但需要添加錯誤處理。 監聽器使用該消息並向其應用業務邏輯,其中包括數據庫寫入。業務邏輯可以拋出異常。春季錯誤處理RabbitMQ

在這些例外情況,我需要

  1. 回滾數據庫寫入的情況。
  2. 寫入Db中的錯誤表,指示msg失敗。
  3. 消息不應重新排隊。

對於

  • 要求#1 - 已經在​​3210加入txManager和註釋的Listner.listen()方法與@Transactional

  • 要求#2 - 添加錯誤處理程序和定製實施DefaultExceptionStrategey

  • 需求#3 - 已設置DefaultRequeueRejected=false

但當BusinessRuntimeException從監聽器拋出的ErrorHandler是沒有得到調用。 不知道缺少什麼。 errorHandler僅對某些異常被調用?

的Config.xml

<tx:annotation-driven transaction-manager="txManager" /> 
<bean id="txManager" 
class="org.springframework.transaction.jta.JtaTransactionManager"> 
<property name="allowCustomIsolationLevels" value="true" /> 

<rabbit:connection-factory id="rabbitConnectionFactory"/> 
<rabbit:template id="rabbitTemplate" connection- 
factory="rabbitConnectionFactory" message-converter="jsonMessageConverter" 
channel-transacted="true"/> 
<rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/> 

RabbitMQConfiguration.java

@Configuration 
@EnableRabbit 
public class RabbitMqConfiguration { 

@Autowired 
private ConnectionFactory rabbitConnectionFactory; 

@Autowired 
private MessageConverter jsonMessageConverter; 

@Bean 
public SimpleRabbitListenerContainerFactory exportPartyListenerContainer() { 
    SimpleRabbitListenerContainerFactory listenerContainer = new SimpleRabbitListenerContainerFactory(); 
    listenerContainer.setConnectionFactory(rabbitConnectionFactory); 
    listenerContainer.setMessageConverter(jsonMessageConverter); 
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); 
    listenerContainer.setChannelTransacted(true); 
    listenerContainer.setDefaultRequeueRejected(false); 
    listenerContainer.setErrorHandler(errorHandler()); 
    return listenerContainer; 
} 

@Bean 
public ErrorHandler errorHandler() { 
    return new ConditionalRejectingErrorHandler(new ExceptionStrategy()); 
} } 

ExceptionStrategy.java

public class ExceptionStrategy extends DefaultExceptionStrategy { 

@Autowired 
private Dao daoBean; 

@Override 
public boolean isFatal(Throwable t) { 

    if (t instanceof BusinessRuntimeException) { 
     BusinessRuntimeException businessException = (BusinessRuntimeException) t; 
     //db call 
     daoBean.updateRecordStaus(); 
     return true; 
    } 

    if (t instanceof ListenerExecutionFailedException) { 
     ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t; 
     logger.error(
       "Failed to process inbound message from queue " + lefe.getFailedMessage().getMessageProperties().getConsumerQueue() 
         + "; failed message: " + lefe.getFailedMessage(), 
       t); 
    } 
    return super.isFatal(t); 
}} 

回答

0

將您的BusinessRuntimeException封裝到RuntimeException中。

catch(BusinessRuntimeException e) 
{ 
    throw new RuntimeException(e); 
}