0

Rightnow,我試圖實現一個基於隊列工作者的設計,在那裏我們接收到數百萬隊列中的消息。而且工作人員是有限的,所以我使用以下代碼分配工作給工作人員。我使用的ExecutorService的一樣:隊列和工作者設計的實時執行器池

ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE); 

    while(LISTEN_FOR_MESSAGE_FLAG == true){ 
     Message receivedMessage = sqsClient.receiveMessage(request); 

     if(receivedMessage == null){ 
     Thread.sleep(5000); // sleep for 5 seconds 
     } 
     else { 
     // lock the message for a certain amount of time (60 secs). 
     // Other workers can't receive a message, when it is locked. 
     sqsClient.changeMessageVisibility(receivedMessage, 60); 

     pool.execute(new Task(receivedMessage); // process the message. 
     } 
    } 

我目前使用Amazon SQS隊列。以上代碼存在嚴重問題。它每隔5秒收到一次消息,並使用可見性超時鎖定它們。一旦可見性超時消失,該鎖就會被中斷。這導致工作人員持有不再鎖定的消息。因此,它有重複處理的問題。 注意:Amazon SQS提供了一種擴展可見性超時的方法。

請幫忙,上面的代碼如何寫出來處理這種情況。

回答

1

如果更改

pool.execute(new Task(receivedMessage); 

到:

Future<?> task = pool.submit(new Task(receivedMessage)); 

您可以執行時間用完之前延長了知名度時間的輔助任務。幫助器任務基本上在Future上執行get(),並且超時時間短於60秒鎖定。當發生TimeoutException時,它會延長可見性超時,並開始再次等待Future。

pool.execute(new Runnable() { 
    @Override 
    public void run() { 
     boolean done = false; 
     while (!done) { 
      try { 
       task.get(50, TimeUnit.SECONDS); 
       done = true; 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       done = true; 
      } catch (ExecutionException e) { 
       done = true; 
       // handle e.getCause() 
      } catch (TimeoutException e) { 
       // we need more time 
       sqsClient.changeMessageVisibility(receivedMessage, 60); 
      } 
     } 

    } 
}); 
+0

這是完美的!謝謝。你的代碼如何控制消息的流入?因爲我不想從隊列中提取新任務,如果工作人員已經忙於處理其他任務。 – Kevindra

+1

你可以使用一個'Semaphore'控制許可等於工人數量。在提取新消息之前,主循環獲得來自「Semaphore」的許可。監控及時完成的​​任務可以在監控任務完成時再次發放許可證。事實上,這樣你可能會消除5秒的睡眠。 – bowmore