我設置了與ActiveMQ和Atomikos駱駝 - 交易。在處理器中拋出一個RuntimeException時,我希望ActiveMQ重試該消息,但該消息沒有返回到隊列中?請參閱下面的配置。駱駝activemq和atomikos集成 - 異常時消耗的消息 - 不重試
<amq:broker id="my-broker" useJmx="true" persistent="false"
brokerName="localhost" schedulerSupport="true">
<amq:plugins>
<amq:redeliveryPlugin fallbackToDeadLetter="true"
sendToDlqIfMaxRetriesExceeded="true">
<amq:redeliveryPolicyMap>
<amq:redeliveryPolicyMap>
<!-- <amq:redeliveryPolicyEntries> <amq:redeliveryPolicy queue="SpecialQueue"
maximumRedeliveries="4" redeliveryDelay="10000" /> </amq:redeliveryPolicyEntries> -->
<!-- the fallback policy for all other destinations -->
<amq:defaultEntry>
<amq:redeliveryPolicy maximumRedeliveries="4"
initialRedeliveryDelay="1000" redeliveryDelay="2000" />
</amq:defaultEntry>
</amq:redeliveryPolicyMap>
</amq:redeliveryPolicyMap>
</amq:redeliveryPlugin>
</amq:plugins>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:61616" />
</amq:transportConnectors>
</amq:broker>
<!-- use the XA-specific version of the connection factory -->
<bean id="amq.connectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"
depends-on="my-broker">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="xa.connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean">
<property name="uniqueResourceName" value="amq1" />
<property name="xaConnectionFactory" ref="amq.connectionFactory" />
</bean>
<bean id="jta.transactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<bean class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<property name="forceShutdown" value="false" />
</bean>
</property>
<property name="userTransaction">
<bean class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
</property>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="amq.connectionFactory" />
<property name="transacted" value="true" />
<property name="transactionManager" ref="jta.transactionManager" />
</bean>
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<bean id="service1" class="com.soelvar.camel.processor.impl.Service1Impl" />
<camelContext xmlns="http://camel.apache.org/schema/spring">
<camel:onException>
<camel:exception>java.lang.Throwable</camel:exception>
<camel:redeliveryPolicy redeliveryDelay="1000"
maximumRedeliveries="3"></camel:redeliveryPolicy>
<camel:handled>
<constant>false</constant>
</camel:handled>
<!-- <camel:rollback markRollbackOnly="true" /> -->
</camel:onException>
<route id="orderProcessingQueue">
<from uri="jms:orderProcessingQueue" />
<to uri="bean:service1?method=process" />
</route>
</camelContext>
我得到的stacktrace如下。在引發RuntimeException之前,第一個Camel重試3次 - 事務回滾,但隊列中沒有任何事情,並且沒有重試?
[Consumer[orderProcessingQueue]] TaskManager INFO THREADS: using JDK thread pooling...
[hread #3 - JmsConsumer[orders]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000300117
[Consumer[orderProcessingQueue]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000100117
[ - JmsConsumer[incomingOrders]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000200117
null
com.soelvar.camel.processor.impl.Service1Impl
############################1##############################
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp INFO commit() done (by application) of transaction 10.28.118.149.tm0000300117
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp INFO commit() done (by application) of transaction 10.28.118.149.tm0000200117
[ - JmsConsumer[incomingOrders]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000400117
[hread #3 - JmsConsumer[orders]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000500117
com.soelvar.camel.processor.impl.Service1Impl
############################2##############################
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp INFO commit() done (by application) of transaction 10.28.118.149.tm0000400117
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp INFO commit() done (by application) of transaction 10.28.118.149.tm0000500117
[hread #3 - JmsConsumer[orders]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000600117
[ - JmsConsumer[incomingOrders]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000700117
com.soelvar.camel.processor.impl.Service1Impl
############################3##############################
com.soelvar.camel.processor.impl.Service1Impl
############################4##############################
[Consumer[orderProcessingQueue]] DefaultErrorHandler ERROR Failed delivery for (MessageId: ID:WS340278-50162-1370387698217-5:4:1:1:1 on ExchangeId: ID-WS340278-50165-1370387698547-0-3). Exhausted after delivery attempt: 4 caught: java.lang.RuntimeException
java.lang.RuntimeException
at com.soelvar.camel.processor.impl.Service1Impl.process(Service1Impl.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:391)
at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:278)
at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251)
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:67)
at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:101)
at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)
at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:390)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:244)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1069)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1061)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:958)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
[Consumer[orderProcessingQueue]] EndpointMessageListener WARN Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.lang.RuntimeException]
org.apache.camel.RuntimeCamelException: java.lang.RuntimeException
at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1338)
at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:187)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:108)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:244)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1069)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1061)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:958)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.RuntimeException
at com.soelvar.camel.processor.impl.Service1Impl.process(Service1Impl.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:391)
at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:278)
at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251)
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:67)
at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:101)
at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)
at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:390)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104)
... 11 more
[Consumer[orderProcessingQueue]] CompositeTransactionImp INFO rollback() done of transaction 10.28.118.149.tm0000100117
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp INFO commit() done (by application) of transaction 10.28.118.149.tm0000600117
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp INFO commit() done (by application) of transaction 10.28.118.149.tm0000700117
[Consumer[orderProcessingQueue]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000800117
[hread #3 - JmsConsumer[orders]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000900117
[ - JmsConsumer[incomingOrders]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0001000117
[Consumer[orderProcessingQueue]] CompositeTransactionImp INFO commit() done (by application) of transaction 10.28.118.149.tm0000800117
[Consumer[orderProcessingQueue]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0001100117
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp INFO commit() done (by application) of transaction 10.28.118.149.tm0000900117
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp INFO commit() done (by application) of transaction 10.28.118.149.tm0001000117
[hread #3 - JmsConsumer[orders]] BaseTransactionManager INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0001200117
感謝您的聯繫。我知道成交標籤,並且也對它進行了測試 - 但它仍然能夠回滾並重試我相信的消息。 – user2453706