2

我試圖找到一種方法在下列情況下使用ThreadPoolExecutor同步任務生產者/消費者使用的ThreadPoolExecutor

  • 我有一個單獨的線程生產和線程池提交任務
  • 一任務提交是同步的,並且將會阻塞,直到在任何給定時間任務可以由任務啓動,只有固定數量的任務可以並行執行。無限數量的同時運行的任務可能會導致內存耗盡。
  • 在提交任務之前,生產者線程始終檢查自第一個提交的任務以來還沒有超過一些最大構建時間。如果超過,則線程關閉,但當前在線程池上運行的任何任務將在應用程序終止前運行完成。
  • 當生產者線程終止時,線程池的隊列上不應有未啓動的任務。

爲了給出更多的上下文,我現在只提交一次所有的任務,並在最大構建時間到期後取消所有由ExecutorService.submit返回的期貨。由於它們是預期的,我忽略所有結果CancellationException。問題是,Future.cancel(false)行爲是奇數和不足,以我的使用情況:

  • 它可以防止任何尚未啓動的任務運行(好)
  • 它不中斷當前正在運行的任務,讓他們運行至完成(好)
  • 但是,它忽略當前正在運行的任務拋出的任何異常,而是拋出CancellationException,其中Exception.getCause()null。因此,我無法區分在最長構建時間後繼續運行的任務運行之前取消的任務,並且發生異常失敗!這很不幸,因爲在這種情況下,我想傳播異常並將其報告給一些錯誤處理機制。

我查看了Java提供的不同阻塞隊列,並找到了這個:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/SynchronousQueue.html。這似乎是理想的,但後來看着https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html,它似乎並不在我希望它的方式與ThreadPoolExecutor玩:

直接切換。對於工作隊列來說,一個很好的默認選擇是將任務移交給線程的 SynchronousQueue,否則 保存它們。在這裏,如果沒有線程立即可用來運行它,則排隊任務的嘗試將失敗,因此將構建一個新線程 。此策略在處理可能具有內部依賴關係的請求組 時避免鎖定。直接切換一般爲 要求無界maximumPoolSizes以避免拒絕新提交的 任務。這反過來又承認當命令持續到達的平均速度比處理的 更快時,無限線程增長的可能性爲 。

什麼是理想的是消費者(=池)上SynchronousQueue.poll塊和製片人(=任務生產者線程)上SynchronousQueue.put塊。

任何想法如何我可以實現我描述的場景,而無需編寫任何複雜的調度邏輯(ThreadPoolExecutor應該包含哪些內容)?

回答

0

我發現了另一種選擇比由@Carlitos方式提出的選項。它包括使用BlockingQueue.offer直接在隊列中添加任務。我一開始並沒有設法使它工作的唯一原因是我不得不發佈這個問題,但我不知道ThreadPoolExecutor的默認行爲是在沒有任何線程的情況下啓動的。這些線程將使用線程工廠延遲創建,並可能被刪除/重新填充,具體取決於池的核心和最大大小以及併發提交的任務數量。

由於線程創建是惰性的,因此如果沒有人正在等待從隊列中獲取元素,則我試圖阻止對offer的調用將失敗,因爲SynchronousQueue.offer會立即退出。相反,SynchronousQueue.put會阻塞,直到有人要求從隊列中取出一個項目,如果線程池爲空,則永遠不會發生。

因此,解決方法是強制線程池使用ThreadPoolExecutor.prestartAllCoreThreads熱切地創建核心線程。我的問題變得相當微不足道。我做了我真正的用例的簡化版本:

import java.util.Random; 
import java.util.concurrent.SynchronousQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.atomic.AtomicLong; 

import static java.util.concurrent.TimeUnit.MILLISECONDS; 
import static java.util.concurrent.TimeUnit.SECONDS; 

public class SimplifiedBuildScheduler { 
    private static final int MAX_POOL_SIZE = 10; 

    private static final Random random = new Random(); 
    private static final AtomicLong nextTaskId = new AtomicLong(0); 

    public static void main(String[] args) throws InterruptedException { 
     SynchronousQueue<Runnable> queue = new SynchronousQueue<>(); 

     // this is a soft requirement in my system, not a real-time guarantee. See the complete semantics in my question. 
     long maxBuildTimeInMillis = 50; 
     // this timeout must be small compared to maxBuildTimeInMillis in order to accurately match the maximum build time 
     long taskSubmissionTimeoutInMillis = 1; 

     ThreadPoolExecutor pool = new ThreadPoolExecutor(MAX_POOL_SIZE, MAX_POOL_SIZE, 0, SECONDS, queue); 
     pool.prestartAllCoreThreads(); 

     Runnable nextTask = makeTask(maxBuildTimeInMillis); 

     long millisAtStart = System.currentTimeMillis(); 
     while (maxBuildTimeInMillis > System.currentTimeMillis() - millisAtStart) { 
      boolean submitted = queue.offer(nextTask, taskSubmissionTimeoutInMillis, MILLISECONDS); 
      if (submitted) { 
       nextTask = makeTask(maxBuildTimeInMillis); 
      } else { 
       System.out.println("Task " + nextTaskId.get() + " was not submitted. " + "It will be rescheduled unless " + 
         "the max build time has expired"); 
      } 
     } 

     System.out.println("Max build time has expired. Stop submitting new tasks and running existing tasks to completion"); 

     pool.shutdown(); 
     pool.awaitTermination(9999999, SECONDS); 
    } 

    private static Runnable makeTask(long maxBuildTimeInMillis) { 
     long sleepTimeInMillis = randomSleepTime(maxBuildTimeInMillis); 
     long taskId = nextTaskId.getAndIncrement(); 
     return() -> { 
      try { 
       System.out.println("Task " + taskId + " sleeping for " + sleepTimeInMillis + " ms"); 
       Thread.sleep(sleepTimeInMillis); 
       System.out.println("Task " + taskId + " completed !"); 
      } catch (InterruptedException ex) { 
       throw new RuntimeException(ex); 
      } 
     }; 
    } 

    private static int randomSleepTime(long maxBuildTimeInMillis) { 
     // voluntarily make it possible that a task finishes after the max build time is expired 
     return 1 + random.nextInt(2 * Math.toIntExact(maxBuildTimeInMillis)); 
    } 
} 

輸出的一個示例如下:

Task 1 was not submitted. It will be rescheduled unless the max build time has expired 
Task 0 sleeping for 23 ms 
Task 1 sleeping for 26 ms 
Task 2 sleeping for 6 ms 
Task 3 sleeping for 9 ms 
Task 4 sleeping for 75 ms 
Task 5 sleeping for 35 ms 
Task 6 sleeping for 81 ms 
Task 8 was not submitted. It will be rescheduled unless the max build time has expired 
Task 8 was not submitted. It will be rescheduled unless the max build time has expired 
Task 7 sleeping for 86 ms 
Task 8 sleeping for 47 ms 
Task 9 sleeping for 40 ms 
Task 11 was not submitted. It will be rescheduled unless the max build time has expired 
Task 2 completed ! 
Task 10 sleeping for 76 ms 
Task 12 was not submitted. It will be rescheduled unless the max build time has expired 
Task 3 completed ! 
Task 11 sleeping for 31 ms 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 0 completed ! 
Task 12 sleeping for 7 ms 
Task 14 was not submitted. It will be rescheduled unless the max build time has expired 
Task 14 was not submitted. It will be rescheduled unless the max build time has expired 
Task 1 completed ! 
Task 13 sleeping for 40 ms 
Task 15 was not submitted. It will be rescheduled unless the max build time has expired 
Task 12 completed ! 
Task 14 sleeping for 93 ms 
Task 16 was not submitted. It will be rescheduled unless the max build time has expired 
Task 16 was not submitted. It will be rescheduled unless the max build time has expired 
Task 16 was not submitted. It will be rescheduled unless the max build time has expired 
Task 5 completed ! 
Task 15 sleeping for 20 ms 
Task 17 was not submitted. It will be rescheduled unless the max build time has expired 
Task 17 was not submitted. It will be rescheduled unless the max build time has expired 
Task 11 completed ! 
Task 16 sleeping for 27 ms 
Task 18 was not submitted. It will be rescheduled unless the max build time has expired 
Task 18 was not submitted. It will be rescheduled unless the max build time has expired 
Task 9 completed ! 
Task 17 sleeping for 95 ms 
Task 19 was not submitted. It will be rescheduled unless the max build time has expired 
Max build time has expired. Stop submitting new tasks and running existing tasks to completion 
Task 8 completed ! 
Task 15 completed ! 
Task 13 completed ! 
Task 16 completed ! 
Task 4 completed ! 
Task 6 completed ! 
Task 10 completed ! 
Task 7 completed ! 
Task 14 completed ! 
Task 17 completed ! 

你會發現,例如,任務19不改期,因爲在調度程序可以嘗試再次將其提供給隊列之前,最大構建時間已過期。您還可以看到,在最大構建時間過期之前開始的所有正在進行的任務都將運行至完成。

注:正如我在代碼中的註釋指出,最大編譯時間是的要求,這意味着它可能無法準確地滿足,我的解決方案確實允許被連提交任務最大構建時間到期後。如果offer的調用在最大編譯時間過期並在此之後結束之前啓動,則可能發生這種情況。爲了減少發生的可能性,調用offer時使用的超時遠小於最大構建時間,這一點很重要。在真實的系統中,線程池通常沒有空閒線程,因此這種競爭條件發生的概率非常小,並且在系統發生時沒有不良後果,因爲最大構建時間是盡最大努力嘗試滿足整體運行時間,而不是嚴格而嚴格的約束。

1

我相信你是在正確的道路上......你所要做的就是使用SynchronousQueue結合RejectedExecutionHandler,使用以下constructor ......這樣你就可以定義一個固定的最大尺寸的線程游泳池(限制你的資源的使用),並定義回退機制來重新安排不能被處理(因爲池是滿)的任務...例如:

public class Experiment { 

    public static final long HANDLER_SLEEP_TIME = 4000; 
    public static final int MAX_POOL_SIZE = 1; 

    public static void main(String[] args) throws InterruptedException { 
     SynchronousQueue<Runnable> queue; 
     RejectedExecutionHandler handler; 
     ThreadPoolExecutor pool; 
     Runnable runA, runB; 

     queue = new SynchronousQueue<>(); 
     handler = new RejectedExecutionHandler() { 
      @Override 
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
       try { 
        System.out.println("Handler invoked! Thread: " + Thread.currentThread().getName()); 
        Thread.sleep(HANDLER_SLEEP_TIME); // this let runnableA finish 
        executor.submit(r); // re schedule 

       } catch (InterruptedException ex) { 
        throw new RuntimeException("Handler Exception!", ex); 
       } 
      } 
     }; 

     pool = new ThreadPoolExecutor(1, MAX_POOL_SIZE, 10, TimeUnit.SECONDS, queue, handler); 
     runA = new Runnable() { 
      @Override 
      public void run() { 
       try { 
        Thread.sleep(3000); 
        System.out.println("hello, I'm runnable A"); 

       } catch (Exception ex) { 
        throw new RuntimeException("RunnableA", ex); 
       } 
      } 
     }; 
     runB = new Runnable() { 
      @Override 
      public void run() { 
       System.out.println("hello, I'm runnable B"); 
      } 
     }; 

     pool.submit(runA); 
     pool.submit(runB); 
     pool.shutdown(); 
    } 
} 

注:RejectedExecutionHandler實施達您!我只是提出了一個睡眠作爲阻止機制,但是你可以做更復雜的邏輯,就像詢問線程池是否有空閒線程一樣。如果沒有,那就睡吧;如果是的話,然後再次提交任務...

+0

是的,我想這會工作,使用睡眠有點骯髒,但它很簡單,我更喜歡複雜的額外邏輯。事情是,如果我能夠修復我們用當前方法發現的罕見邊緣案例(立即安排所有事務+在超時之後取消所有剩餘的任務),而不會引入更多複雜性,那麼我只會設法讓此更改得到接受。我會考慮你的解決方案,看看它如何適合我的代碼,謝謝。 – Dici

+1

我遇到了這種方法的一個問題:作爲參數傳遞的'Runnable'與提交的參數不完全相同。相反,它被包裝在一個'FutureTask'中,它有一些異常處理邏輯,它可以阻止我自己的工作。你不能用我的問題給出的背景知道,但我將這添加爲未來讀者的警告。我可以通過強烈依賴「SynchronousQueue」的行爲來解決這個問題。我不能分享整個代碼。我會告訴你,如果我達到一個工作解決方案。 – Dici

+1

我設法使它工作。這有點骯髒,但我很高興能夠在改善行爲的同時刪除現有代碼的25%(儘管我失去了一個關於錯誤處理的小功能)。我認爲我喜歡這一切:p。我會接受這個答案。 – Dici