我已經定義了一個JMS入站通道適配器作爲以下中Spring集成3.0.1.RELEASE:Spring集成JMS入站通道適配器在特定的固定速率不輪詢
<int-jms:inbound-channel-adapter channel="inChannel" phase="1000"
destination-name="jmsQueue" extract-payload="true"
connection-factory="connectionFactory">
<int:poller max-messages-per-poll="1" fixed-rate="1000"/>
</int-jms:inbound-channel-adapter>
但是多個消息從消耗JMS代理的隨機不可靠距離,消耗的消息數量可能會從幾秒到幾分鐘。我試過fixed-delay
而不是fixed-rate
,但它具有相同的行爲。
哪一個其他因素可以使輪詢操作在不同的時間執行,以及如何實現可靠的輪詢時間?
編輯:
我只限於應用到一個默認的輪詢,用一個單一的JMS入站通道適配器(雖然有一些消息驅動通道適配器),它仍然具有相同的行爲。我抽動了等待時間爲fixed-delay
的3000和receive-timeout
爲5000.
我已經啓動了應用程序,並在JMS隊列中收集了一些消息,日誌顯示這樣的條目,切換線程如下所示後一些回調操作的任務調度的:
2016-09-23 18:48:25,592 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:18:1,started=true}
2016-09-23 18:48:25,630 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false'
2016-09-23 18:48:28,639 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:19:1,started=true}
2016-09-23 18:48:28,643 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false'
2016-09-23 18:48:31,651 | DEBUG | ask-scheduler-3 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:20:1,started=true}
2016-09-23 18:48:31,657 | DEBUG | ask-scheduler-3 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false'
2016-09-23 18:48:34,666 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:21:1,started=true}
2016-09-23 18:48:34,670 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false'
然後,在10分鐘後:
2016-09-23 18:58:10,032 | DEBUG | ask-scheduler-8 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:212:1,started=true}
2016-09-23 18:58:10,091 | DEBUG | ask-scheduler-8 | ion.endpoint.SourcePollingChannelAdapter | Poll resulted in Message:
並且該消息被消耗。
我已經拿了多個轉儲,並可能在運行狀態下找到任務執行線程只有一個實例:
"task-scheduler-4" prio=6 tid=0x000000001074f800 nid=0x4364 runnable [0x000000001d4fe000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:167)
at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237)
- locked <0x00000007dc803080> (a java.util.concurrent.atomic.AtomicBoolean)
at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
- locked <0x00000007dc8031f8> (a java.lang.Object)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
at org.apache.activemq.ActiveMQSession.doClose(ActiveMQSession.java:590)
at org.apache.activemq.ActiveMQSession.close(ActiveMQSession.java:581)
at org.springframework.jms.support.JmsUtils.closeSession(JmsUtils.java:108)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:497)
at org.springframework.jms.core.JmsTemplate.receiveSelected(JmsTemplate.java:761)
at org.springframework.integration.jms.JmsDestinationPollingSource.doReceiveJmsMessage(JmsDestinationPollingSource.java:118)
at org.springframework.integration.jms.JmsDestinationPollingSource.receive(JmsDestinationPollingSource.java:93)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:111)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:184)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:141)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:273)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
所有其他線程轉儲表明,任務調度的所有線程要麼是上WAITING或TIMED_WAITING如下(包括完成之後上一次轉儲的線程)。這從最後一個30秒後轉儲:
"task-scheduler-4" prio=6 tid=0x00000000118d3800 nid=0x4abc waiting on condition [0x000000000f8bf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007838d74d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"task-scheduler-3" prio=6 tid=0x00000000118d4800 nid=0x4f14 waiting on condition [0x000000001ba0f000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000787c10210> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
任何線索?
我編輯了一些日誌和線程轉儲文章,仍然無法整理你:/ – gnzlrm
很難說,顯然輪詢者正在與ActiveMQ交互 - 關閉會話。我不知道activemq內部結構,爲什麼這可能需要很長時間。我可以想象如果你有巨大的消息和/或一個糟糕的網絡零星的時間。下一步,我會採取如果調試這將是看着WireShark或類似的網絡跟蹤。 –
對不起,我上週休假了。我無法使用WireShark,因爲我在一個受限制的環境中使用有限的工具箱:/ 有一件事我看起來就是這樣,當我指定'receive-timeout = 5000'時,輪詢操作顯然很少(一些民意調查幾乎立即被報告爲「虛假」),而報告接收消息的操作需要更多時間。是不是應該在回調操作開始和結果報告爲'假'之間等待5秒鐘? – gnzlrm