0
Rightnow,我試圖實現一個基於隊列工作者的設計,在那裏我們接收到數百萬隊列中的消息。而且工作人員是有限的,所以我使用以下代碼分配工作給工作人員。我使用的ExecutorService的一樣:隊列和工作者設計的實時執行器池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
while(LISTEN_FOR_MESSAGE_FLAG == true){
Message receivedMessage = sqsClient.receiveMessage(request);
if(receivedMessage == null){
Thread.sleep(5000); // sleep for 5 seconds
}
else {
// lock the message for a certain amount of time (60 secs).
// Other workers can't receive a message, when it is locked.
sqsClient.changeMessageVisibility(receivedMessage, 60);
pool.execute(new Task(receivedMessage); // process the message.
}
}
我目前使用Amazon SQS隊列。以上代碼存在嚴重問題。它每隔5秒收到一次消息,並使用可見性超時鎖定它們。一旦可見性超時消失,該鎖就會被中斷。這導致工作人員持有不再鎖定的消息。因此,它有重複處理的問題。 注意:Amazon SQS提供了一種擴展可見性超時的方法。
請幫忙,上面的代碼如何寫出來處理這種情況。
這是完美的!謝謝。你的代碼如何控制消息的流入?因爲我不想從隊列中提取新任務,如果工作人員已經忙於處理其他任務。 – Kevindra
你可以使用一個'Semaphore'控制許可等於工人數量。在提取新消息之前,主循環獲得來自「Semaphore」的許可。監控及時完成的任務可以在監控任務完成時再次發放許可證。事實上,這樣你可能會消除5秒的睡眠。 – bowmore