2015-07-20 114 views
8

我正在使用來自亞馬遜SQS隊列的消息。隊列中有數千條消息。當我啓動應用程序(用Spring框架用Java編寫)時,它開始輪詢來自隊列的消息,並在收到500條消息後停止。如果我再次啓動應用程序它將消耗另外500條消息。消費500條消息後亞馬遜SQS java sdk停止

我的代碼是這樣......

連接工廠

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactoryActiveMQ() { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrency("3-15"); 
    factory.setReceiveTimeout(3000L); 
    return factory; 
} 

@Bean(name = "sqsJmsListenerContainerFactory") 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(CustomDestinationResolver resolver) { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
    factory.setConnectionFactory(sqsConnectionFactory()); 
    factory.setConcurrency("3-15"); 
    factory.setReceiveTimeout(3000L); 
    return factory; 
} 

監聽

@JmsListener(containerFactory = "sqsJmsListenerContainerFactory", destination = "sqs.queue") 
public void onMessage(Message message) { 
    //Processing message 
} 

是什麼,我需要在亞馬遜隊列或者連接工廠bean配置?
謝謝:-)

更新:增加線程轉儲

而應用程序是使用消息
使用DefaultMessageListenerContainer在線程轉儲就像

"[email protected]" prio=5 tid=0x18 nid=NA runnable 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1) 
     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
     at java.net.SocketInputStream.read(SocketInputStream.java:170) 
     at java.net.SocketInputStream.read(SocketInputStream.java:141) 
     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
     at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
     - locked <0x2230> (a java.lang.Object) 
     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
     - locked <0x2231> (a sun.security.ssl.AppInputStream) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
     at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
     at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
     at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) 
     at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
     at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
     at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197) 
     at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) 
     at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
     at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) 
     at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682) 
     at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486) 
     at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
     at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685) 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460) 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295) 
     at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291) 
     at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:1340) 
     at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:127) 
     at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:33) 
     at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.notifyMessageReceived(AutoAcknowledger.java:42) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.messageHandler(SQSMessageConsumerPrefetch.java:477) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.receive(SQSMessageConsumerPrefetch.java:410) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumer.receive(SQSMessageConsumer.java:157) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:413) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:293) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033) 
     at java.lang.Thread.run(Thread.java:745) 


ConsumerPrefetchThread在線程轉儲像

"[email protected]" daemon prio=5 tid=0x1b nid=NA runnable 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1) 
     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
     at java.net.SocketInputStream.read(SocketInputStream.java:170) 
     at java.net.SocketInputStream.read(SocketInputStream.java:141) 
     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
     at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
     - locked <0x23a7> (a java.lang.Object) 
     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
     - locked <0x23a8> (a sun.security.ssl.AppInputStream) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
     at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
     at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
     at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) 
     at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
     at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
     at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197) 
     at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) 
     at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
     at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) 
     at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682) 
     at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486) 
     at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
     at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685) 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460) 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295) 
     at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291) 
     at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1021) 
     at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:319) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:216) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:180) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

當應用程序停止使用消息
ConsumerPrefetchThread在線程轉儲就像

"[email protected]" prio=5 tid=0x18 nid=NA waiting 
    java.lang.Thread.State: WAITING 
     at java.lang.Object.wait(Object.java:-1) 
     at java.lang.Object.wait(Object.java:502) 
     at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) 
     at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133) 
     at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167) 
     at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40) 
     at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117) 
     at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40) 
     at sun.reflect.GeneratedMethodAccessor240.invoke(Unknown Source:-1) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:185) 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:104) 
     at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:90) 
     at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:66) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:674) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:634) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:605) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033) 
     at java.lang.Thread.run(Thread.java:745) 


ConsumerPrefetchThread在線程轉儲就像

"[email protected]" daemon prio=5 tid=0x1b nid=NA waiting 
    java.lang.Thread.State: WAITING 
     at java.lang.Object.wait(Object.java:-1) 
     at java.lang.Object.wait(Object.java:502) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.waitForPrefetch(SQSMessageConsumerPrefetch.java:273) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:174) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
+0

嘗試在客戶停止時執行線程轉儲,以查看容器線程正在執行的操作。也許它卡在你的代碼中? –

+0

@GaryRussell我添加了線程轉儲。在這個應用程序中,我也使用了ActiveMQ。我正在推動我將從SQS獲得的消息到ActiveMQ隊列。 – Piyush

回答

3

看起來像某種池耗盡你的代碼...

at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) 
    at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133) 
    at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167) 
    at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40) 
    at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117) 
    at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40) 

容器線程卡住,試圖從PooledConnection獲得會話。

也許你沒有返回池的會話?

考慮使用JmsTemplate而不是您自己的代碼與JMS進行對話。它避免了這些問題。

+1

我修復了這個問題。發送消息後,我沒有關閉會話和連接。我學會了如何使用線程轉儲。謝謝@GaryRussell :) – Piyush