2013-06-04 75 views
0

我設置了與ActiveMQ和Atomikos駱駝 - 交易。在處理器中拋出一個RuntimeException時,我希望ActiveMQ重試該消息,但該消息沒有返回到隊列中?請參閱下面的配置。駱駝activemq和atomikos集成 - 異常時消耗的消息 - 不重試

<amq:broker id="my-broker" useJmx="true" persistent="false" 
    brokerName="localhost" schedulerSupport="true"> 
    <amq:plugins> 
     <amq:redeliveryPlugin fallbackToDeadLetter="true" 
      sendToDlqIfMaxRetriesExceeded="true"> 
      <amq:redeliveryPolicyMap> 
       <amq:redeliveryPolicyMap> 
        <!-- <amq:redeliveryPolicyEntries> <amq:redeliveryPolicy queue="SpecialQueue" 
         maximumRedeliveries="4" redeliveryDelay="10000" /> </amq:redeliveryPolicyEntries> --> 
        <!-- the fallback policy for all other destinations --> 
        <amq:defaultEntry> 
         <amq:redeliveryPolicy maximumRedeliveries="4" 
          initialRedeliveryDelay="1000" redeliveryDelay="2000" /> 
        </amq:defaultEntry> 
       </amq:redeliveryPolicyMap> 
      </amq:redeliveryPolicyMap> 
     </amq:redeliveryPlugin> 
    </amq:plugins> 
    <amq:transportConnectors> 
     <amq:transportConnector uri="tcp://localhost:61616" /> 
    </amq:transportConnectors> 
</amq:broker> 

<!-- use the XA-specific version of the connection factory --> 
<bean id="amq.connectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory" 
    depends-on="my-broker"> 
    <property name="brokerURL" value="tcp://localhost:61616" /> 
</bean> 
<bean id="xa.connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"> 
    <property name="uniqueResourceName" value="amq1" /> 
    <property name="xaConnectionFactory" ref="amq.connectionFactory" /> 
</bean> 

<bean id="jta.transactionManager" 
    class="org.springframework.transaction.jta.JtaTransactionManager"> 
    <property name="transactionManager"> 
     <bean class="com.atomikos.icatch.jta.UserTransactionManager" 
      init-method="init" destroy-method="close"> 
      <property name="forceShutdown" value="false" /> 
     </bean> 
    </property> 
    <property name="userTransaction"> 
     <bean class="com.atomikos.icatch.jta.UserTransactionImp"> 
      <property name="transactionTimeout" value="300" /> 
     </bean> 
    </property> 
</bean> 
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="amq.connectionFactory" /> 
    <property name="transacted" value="true" /> 
    <property name="transactionManager" ref="jta.transactionManager" /> 
</bean> 

<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsConfig" /> 
</bean> 
<bean id="service1" class="com.soelvar.camel.processor.impl.Service1Impl" /> 

<camelContext xmlns="http://camel.apache.org/schema/spring"> 
    <camel:onException> 
     <camel:exception>java.lang.Throwable</camel:exception> 
     <camel:redeliveryPolicy redeliveryDelay="1000" 
      maximumRedeliveries="3"></camel:redeliveryPolicy> 
     <camel:handled> 
      <constant>false</constant> 
     </camel:handled> 
     <!-- <camel:rollback markRollbackOnly="true" /> --> 
    </camel:onException> 

    <route id="orderProcessingQueue"> 
     <from uri="jms:orderProcessingQueue" /> 
     <to uri="bean:service1?method=process" /> 
    </route> 
</camelContext> 

我得到的stacktrace如下。在引發RuntimeException之前,第一個Camel重試3次 - 事務回滾,但隊列中沒有任何事情,並且沒有重試?

[Consumer[orderProcessingQueue]] TaskManager     INFO THREADS: using JDK thread pooling... 
[hread #3 - JmsConsumer[orders]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000300117 
[Consumer[orderProcessingQueue]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000100117 
[ - JmsConsumer[incomingOrders]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000200117 
null 
com.soelvar.camel.processor.impl.Service1Impl 
############################1############################## 
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp  INFO commit() done (by application) of transaction 10.28.118.149.tm0000300117 
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp  INFO commit() done (by application) of transaction 10.28.118.149.tm0000200117 
[ - JmsConsumer[incomingOrders]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000400117 
[hread #3 - JmsConsumer[orders]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000500117 
com.soelvar.camel.processor.impl.Service1Impl 
############################2############################## 
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp  INFO commit() done (by application) of transaction 10.28.118.149.tm0000400117 
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp  INFO commit() done (by application) of transaction 10.28.118.149.tm0000500117 
[hread #3 - JmsConsumer[orders]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000600117 
[ - JmsConsumer[incomingOrders]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000700117 
com.soelvar.camel.processor.impl.Service1Impl 
############################3############################## 
com.soelvar.camel.processor.impl.Service1Impl 
############################4############################## 
[Consumer[orderProcessingQueue]] DefaultErrorHandler   ERROR Failed delivery for (MessageId: ID:WS340278-50162-1370387698217-5:4:1:1:1 on ExchangeId: ID-WS340278-50165-1370387698547-0-3). Exhausted after delivery attempt: 4 caught: java.lang.RuntimeException 
java.lang.RuntimeException 
    at com.soelvar.camel.processor.impl.Service1Impl.process(Service1Impl.java:22) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:391) 
    at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:278) 
    at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251) 
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) 
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:67) 
    at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:101) 
    at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71) 
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) 
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122) 
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298) 
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) 
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) 
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) 
    at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:390) 
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) 
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) 
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) 
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) 
    at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) 
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86) 
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104) 
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562) 
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500) 
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468) 
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326) 
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:244) 
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1069) 
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1061) 
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:958) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 
[Consumer[orderProcessingQueue]] EndpointMessageListener  WARN Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.lang.RuntimeException] 
org.apache.camel.RuntimeCamelException: java.lang.RuntimeException 
    at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1338) 
    at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:187) 
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:108) 
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562) 
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500) 
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468) 
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326) 
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:244) 
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1069) 
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1061) 
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:958) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 
Caused by: java.lang.RuntimeException 
    at com.soelvar.camel.processor.impl.Service1Impl.process(Service1Impl.java:22) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:391) 
    at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:278) 
    at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251) 
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) 
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:67) 
    at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:101) 
    at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71) 
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) 
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122) 
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298) 
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) 
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) 
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) 
    at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:390) 
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) 
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) 
    at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) 
    at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) 
    at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) 
    at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) 
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) 
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) 
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86) 
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104) 
    ... 11 more 
[Consumer[orderProcessingQueue]] CompositeTransactionImp  INFO rollback() done of transaction 10.28.118.149.tm0000100117 
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp  INFO commit() done (by application) of transaction 10.28.118.149.tm0000600117 
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp  INFO commit() done (by application) of transaction 10.28.118.149.tm0000700117 
[Consumer[orderProcessingQueue]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000800117 
[hread #3 - JmsConsumer[orders]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0000900117 
[ - JmsConsumer[incomingOrders]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0001000117 
[Consumer[orderProcessingQueue]] CompositeTransactionImp  INFO commit() done (by application) of transaction 10.28.118.149.tm0000800117 
[Consumer[orderProcessingQueue]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0001100117 
[hread #3 - JmsConsumer[orders]] CompositeTransactionImp  INFO commit() done (by application) of transaction 10.28.118.149.tm0000900117 
[ - JmsConsumer[incomingOrders]] CompositeTransactionImp  INFO commit() done (by application) of transaction 10.28.118.149.tm0001000117 
[hread #3 - JmsConsumer[orders]] BaseTransactionManager   INFO createCompositeTransaction (300000): created new ROOT transaction with id 10.28.118.149.tm0001200117 

回答

0

參見本頁關於交易:http://camel.apache.org/transactional-client.html

如果你在行動書有駱駝的副本,那麼第9章是關於交易。

您需要在路由中添加< />,以便駱駝使用TX管理器。

+0

感謝您的聯繫。我知道成交標籤,並且也對它進行了測試 - 但它仍然能夠回滾並重試我相信的消息。 – user2453706

0

我調試了一下我的配置,發現哪部分配置導致沒有發生ActiveMQ重試。

下面的JmsConfiguration不起作用。

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="amq.connectionFactory" /> 
    <property name="transacted" value="true" /> 
    <property name="transactionManager" ref="jta.transactionManager" /> 
</bean> 

<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsConfig" /> 
</bean> 

但是這樣做。

<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="transacted" value="true" /> 
    <property name="transactionManager" ref="jta.transactionManager" /> 
</bean> 

不幸的是我現在無法得到的ActiveMQ挑redeliveryPlugin配置,但我已經看到了對這個問題提出了一個缺陷。有誰知道解決方法? 這是特定於我相信的org.apache.activemq.ActiveMQXAConnectionFactory。

乾杯, Jesper