2015-04-28 53 views
5

我試圖用Spring Integration和ActiveMQ Message Broker配置JMS。我的出站通道應該由JDBC消息存儲支持以防止數據丟失,例如經紀人或我的應用程序脫機。Spring與JMS + ActiveMQ集成:重新連接後,消息保留在JDBC消息存儲中

我的配置似乎工作到目前爲止,但是,JDBC消息存儲庫的行爲並不像我所期望的那樣。如果我斷開代理,發送到出站通道的消息將按預期保持,但在重新連接後,它們將保留在數據庫中,並且不會發送到隊列。然而,進一步的消息我重新連接之後發送到達隊列,如果我重新啓動我的應用程序中的持久消息終於發以及...

應用程序的context.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:jms="http://www.springframework.org/schema/jms" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms" 
    xmlns:oxm="http://www.springframework.org/schema/oxm" 
    xmlns:j2ee="http://www.springframework.org/schema/jee" 
    xmlns:amq="http://activemq.apache.org/schema/core" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm.xsd 
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd 
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd 
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 
    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> 

<!-- ActiveMQ Configuration --> 
<jms:annotation-driven/> 
<int:annotation-config/> 

<!-- Factory for ActiveMQ connections from JNDI --> 
<j2ee:jndi-lookup jndi-name="java:comp/env/jms/connectionFactory" 
        id="jmsConnectionFactory" 
        expected-type="org.apache.activemq.ActiveMQConnectionFactory" 
        proxy-interface="javax.jms.ConnectionFactory" 
        lookup-on-startup="false" 
        resource-ref="true" 
        cache="true"/> 


<bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> 
    <property name="connectionFactory" ref="jmsConnectionFactory"/> 
    <property name="maxConnections" value="8"/> 
    <property name="reconnectOnException" value="true"/> 
</bean> 


<!-- JAXB-marshaller from spring-oxm used to handle XML-payload of messages --> 
<oxm:jaxb2-marshaller id="jax2bMarshaller" context-path="de.br.ecomx"/> 

<!-- message-converter --> 
<bean id="messageConverter" class="org.springframework.jms.support.converter.MarshallingMessageConverter"> 
    <property name="marshaller" ref="jax2bMarshaller"/> 
    <property name="unmarshaller" ref="jax2bMarshaller"/> 
</bean> 

<!-- Message Store implementation --> 
<!-- Persists messages temporariliy in DB to prevent data loss on server-shutdown --> 
<!-- Has to be injected specifically in Queue Channels --> 
<bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore"> 
    <property name="dataSource" ref="dataSource"/> 
    <property name="channelMessageStoreQueryProvider"> 
     <bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider"/> 
    </property> 
    <property name="usingIdCache" value="true"/> 
    <property name="region" value="BWH"/> 
    <property name="tablePrefix" value="BWH_"/> 
</bean> 

<!-- default poller for (non-message-driven) channel-adapters --> 
<int:poller id="defaultPoller" default="true" fixed-delay="500"> 
    <int:transactional propagation="REQUIRED" isolation="DEFAULT" transaction-manager="transactionManager"/> 
</int:poller> 

<!-- LOGGING --> 
<!--<int:logging-channel-adapter id="logger" level="DEBUG" log-full-message="true"/>--> 


<!-- CHANNEL DEFINITIONS --> 

<!-- intermediate channel for message enrichment --> 
<int:channel id="output_enricher_channel"><int:queue/></int:channel> 

<!-- general output-channel - specific channel is choosed by a router --> 
<int:channel id="output_router"><int:queue/></int:channel> 

<!-- channel definitions connected to consumer queues --> 
<int:channel id="bwh_articleMasterData_input_channel"><int:queue/></int:channel> 
<int:channel id="bwh_transferOrder_input_channel"><int:queue/></int:channel> 
<!-- add your channel here... --> 

<!-- channel definitions connected to producer queues --> 
<int:channel id="ax_transferOrderPacked_input_channel"><int:queue message-store="store"/></int:channel> 
<!-- add your channel here... --> 


<!-- MESSAGE HANDLING --> 

<!-- Enriches the 'messageId'-property of payload with 'id'-value from header and sends them to router --> 
<int:enricher input-channel="output_enricher_channel" output-channel="output_router"> 
    <int:property name="messageId" expression="headers.id"/> 
</int:enricher> 

<!-- routing of outgoing messages to their type-specific channel --> 
<int:payload-type-router input-channel="output_router"> 
    <int:mapping type="de.br.ecomx.TransferOrdersPacked" channel="ax_transferOrderPacked_input_channel"/> 
</int:payload-type-router> 


<!-- ENDPOINTS --> 

<!-- endpoint configurations connecting queues with channels --> 
<int-jms:message-driven-channel-adapter id="articleMasterDataConsumerChannelAdapter" connection-factory="connectionFactory" 
             destination-name="${queue.bwh.articlemasterdata.in}" channel="bwh_articleMasterData_input_channel" 
             message-converter="messageConverter" acknowledge="client" max-concurrent-consumers="10" 
             auto-startup="false"/> 
<int-jms:message-driven-channel-adapter id="transferOrderConsumerChannelAdapter" connection-factory="connectionFactory" 
             destination-name="${queue.bwh.transferorder.in}" channel="bwh_transferOrder_input_channel" 
             message-converter="messageConverter" acknowledge="client" max-concurrent-consumers="5" 
             auto-startup="false"/> 
<int-jms:outbound-channel-adapter id="transferOrderPackedProducerChannelAdapter" connection-factory="connectionFactory" 
            destination-name="${queue.ax.transferorderpacked.in}" channel="ax_transferOrderPacked_input_channel" 
            message-converter="messageConverter" delivery-persistent="true" session-transacted="true" 
            auto-startup="false"/> 

<!-- add your endpoint here... --> 
<!-- REMEMBER: If auto-startup is set to false (which is recommended for all channel adapters) --> 
<!--   the channel adapter has to be registered in ecomxConsumerChannelAdapterContainer below!!! --> 


<!-- gateway definitions usually injected by producer-services and used to send messages to an outbound-channel-adapter --> 
<int:gateway default-request-channel="output_enricher_channel" service-interface="de.lo.bwh.jms.gateway.EcomxProducerGateway"/> 

<!-- connect endpoints with services --> 
<int:service-activator input-channel="bwh_articleMasterData_input_channel" ref="ecomxArticleMasterDataConsumer"/> 
<int:service-activator input-channel="bwh_transferOrder_input_channel" ref="ecomxTransferOrderConsumer"/> 


<!-- container for endpoints used for start/stop all registered consumers/producers at once --> 
<bean id="ecomxConsumerChannelAdapterContainer" class="de.lo.bwh.jms.EcomxConsumerChannelAdapterContainer"> 
    <property name="endpoints"> 
     <list value-type="org.springframework.integration.jms.JmsMessageDrivenEndpoint"> 
      <ref bean="articleMasterDataConsumerChannelAdapter"/> 
      <ref bean="transferOrderConsumerChannelAdapter"/> 
     </list> 
    </property> 
</bean> 

<bean id="ecomxProducerChannelAdapterContainer" class="de.lo.bwh.jms.EcomxProducerChannelAdapterContainer"> 
    <property name="endpoints"> 
     <list value-type="org.springframework.integration.endpoint.PollingConsumer"> 
      <ref bean="transferOrderPackedProducerChannelAdapter"/> 
     </list> 
    </property> 
</bean> 

</beans> 

的context.xml

<Resource name="jms/connectionFactory" 
     auth="Container" 
     type="org.apache.activemq.ActiveMQConnectionFactory" 
     description="JMS Connection Factory" 
     factory="org.apache.activemq.jndi.JNDIReferenceFactory" 
     brokerURL="tcp://my.url.net:61616" 
     brokerName="myActiveMQBroker"/> 

我使用EcomxProducerGateway發送消息。它將消息發送到output_enricher_channel,將消息轉發到output_router,該消息將消息按有效負載類型路由到正確的出站通道。

斷開代理時,幾秒鐘後會記錄幾個例外情況。下面的堆棧跟蹤可能會有所幫助:

[WARN 08:37:07] SingleConnectionFactory.onException(322) | Encountered a JMSException - resetting the underlying JMS Connection 
javax.jms.JMSException: Operation timed out 
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54) 
at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1997) 
at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:2016) 
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101) 
at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126) 
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101) 
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101) 
at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:160) 
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314) 
at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96) 
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:200) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketException: Operation timed out 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:170) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) 
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:609) 
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) 
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:594) 
at java.io.DataInputStream.readInt(DataInputStream.java:387) 
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:258) 
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221) 
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213) 
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) 
... 1 more 

[ERROR 08:37:07] LoggingHandler.handleMessageInternal(145) | org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit; nested exception is javax.jms.JMSException: Operation timed out 
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:424) 
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:404) 
at org.springframework.transaction.support.ResourceHolderSynchronization.afterCommit(ResourceHolderSynchronization.java:85) 
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:133) 
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:121) 
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:954) 
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:799) 
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726) 
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:515) 
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:291) 
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) 
at com.sun.proxy.$Proxy484.call(Unknown Source) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298) 
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292) 
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: javax.jms.JMSException: Operation timed out 
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54) 
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1420) 
at org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:761) 
at org.apache.activemq.TransactionContext.commit(TransactionContext.java:327) 
at org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:574) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:384) 
at com.sun.proxy.$Proxy602.commit(Unknown Source) 
at org.springframework.jms.connection.JmsResourceHolder.commitAll(JmsResourceHolder.java:184) 
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:421) 
... 27 more 
Caused by: java.net.SocketException: Operation timed out 
at java.net.SocketInputStream.socketRead0(Native Method) 
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
at java.net.SocketInputStream.read(SocketInputStream.java:170) 
at java.net.SocketInputStream.read(SocketInputStream.java:141) 
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) 
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:609) 
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) 
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:594) 
at java.io.DataInputStream.readInt(DataInputStream.java:387) 
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:258) 
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221) 
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213) 
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) 
... 1 more 

[ERROR 08:37:08] LoggingHandler.handleMessageInternal(145) | org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler#0]; nested exception is org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) 
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) 
at sun.reflect.GeneratedMethodAccessor464.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) 
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281) 
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) 
at com.sun.proxy.$Proxy484.call(Unknown Source) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298) 
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) 
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292) 
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net 
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316) 
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169) 
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:496) 
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:579) 
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:705) 
at org.springframework.integration.jms.JmsSendingMessageHandler.send(JmsSendingMessageHandler.java:145) 
at org.springframework.integration.jms.JmsSendingMessageHandler.handleMessageInternal(JmsSendingMessageHandler.java:115) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) 
... 31 more 
Caused by: javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net 
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36) 
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:360) 
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:305) 
at org.apache.activemq.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:245) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) 
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201) 
at com.sun.proxy.$Proxy156.createConnection(Unknown Source) 
at org.springframework.jms.connection.SingleConnectionFactory.doCreateConnection(SingleConnectionFactory.java:365) 
at org.springframework.jms.connection.SingleConnectionFactory.initConnection(SingleConnectionFactory.java:305) 
at org.springframework.jms.connection.SingleConnectionFactory.getConnection(SingleConnectionFactory.java:283) 
at org.springframework.jms.connection.SingleConnectionFactory.createConnection(SingleConnectionFactory.java:224) 
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180) 
at org.springframework.jms.core.JmsTemplate.access$600(JmsTemplate.java:90) 
at org.springframework.jms.core.JmsTemplate$JmsTemplateResourceFactory.createConnection(JmsTemplate.java:1205) 
at org.springframework.jms.connection.ConnectionFactoryUtils.doGetTransactionalSession(ConnectionFactoryUtils.java:312) 
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:480) 
... 36 more 
Caused by: java.net.UnknownHostException: my.url.net 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589) 
at org.apache.activemq.transport.tcp.TcpTransport.connect(TcpTransport.java:501) 
at org.apache.activemq.transport.tcp.TcpTransport.doStart(TcpTransport.java:464) 
at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55) 
at org.apache.activemq.transport.AbstractInactivityMonitor.start(AbstractInactivityMonitor.java:138) 
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58) 
at org.apache.activemq.transport.WireFormatNegotiator.start(WireFormatNegotiator.java:72) 
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58) 
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58) 
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:340) 
... 54 more 

回答

3

好了,不適合,因爲usingIdCache="true"回滾信息工作。該消息被返回到數據庫,但是它們在下一個poll中沒有被拾取,因爲它們的ID位於內存中idCache

我們提供這一建議得到處理加工或回滾消息:

<int:transaction-synchronization-factory id="syncFactory"> 
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" /> 
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/> 
</int:transaction-synchronization-factory> 

... 

<int:poller id="defaultPoller" default="true" fixed-delay="500"> 
    <int:transactional propagation="REQUIRED" isolation="DEFAULT" 
         synchronization-factory="syncFactory" 
         transaction-manager="transactionManager"/> 
</int:poller> 

隨着該獨立TX的結果消息ID從idCache刪除。在你的情況下,它將準備好再次進行輪詢。

查看更多關於Reference Manual的信息。並且JdbcChannelMessageStore#setUsingIdCache上的JavaDoc顯示相同。

+0

作品,謝謝! –