我們正在使用Spring Cloud AWS與SQS進行交互。我們使用@SqsListener
註釋將消息從隊列中拉出。我們有deletionPolicy = NEVER
,這意味着我們手動確認所有我們選中的郵件。春季雲SQS消費阻塞,直到所有消息處理
我們的問題是SimpleMessageListenerContainer
(它處理隊列中的消息處理)等待所有工作線程完成,然後再從隊列中選擇更多的消息。
換句話說,我們現在看到的是這樣的:
- 拉10個關閉消息隊列。
- 啓動10個線程來完成工作。
- 其中一個正在執行工作的線程在IO調用緩慢時被阻塞。
- 現在阻止應用程序從隊列中獲取更多消息,並且阻止應用程序執行更多工作,直到慢速呼叫結束。
我們可以看到在SimpleMessageListenerContainer.AsynchronousMessageListener
代碼負責此
@Override
public void run() {
while (isQueueRunning()) {
try {
ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
for (Message message : receiveMessageResult.getMessages()) {
if (isQueueRunning()) {
MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
} else {
messageBatchLatch.countDown();
}
}
try {
messageBatchLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} catch (Exception e) {
getLogger().warn("An Exception occurred while polling queue '{}'. The failing operation will be " +
"retried in {} milliseconds", this.logicalQueueName, getBackOffTime(), e);
try {
//noinspection BusyWait
Thread.sleep(getBackOffTime());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
}
理想情況下,我們希望對消息監聽器持續回暖的消息從隊列中進行處理。
由於AbstractMessageListenerContainer
是本地包,我們似乎無法實現我們自己的MessageListenerContainer
。
是否有解決此問題的方法?
謝謝,但我們將如何重寫「AsynchronousMessageListener」?這是私人的.. – GBC
對不起,沒有意識到這不是你的代碼: - (...嗯,在這種情況下,假設春天不允許你覆蓋這種行爲,我想它會更容易實現你自己的隊列輪詢代碼... – Filipe
你可以重載'startQueue(String queueName,QueueAttributes queueAttributes)',對吧? – skirsch