我發現了另一種選擇比由@Carlitos方式提出的選項。它包括使用BlockingQueue.offer
直接在隊列中添加任務。我一開始並沒有設法使它工作的唯一原因是我不得不發佈這個問題,但我不知道ThreadPoolExecutor
的默認行爲是在沒有任何線程的情況下啓動的。這些線程將使用線程工廠延遲創建,並可能被刪除/重新填充,具體取決於池的核心和最大大小以及併發提交的任務數量。
由於線程創建是惰性的,因此如果沒有人正在等待從隊列中獲取元素,則我試圖阻止對offer
的調用將失敗,因爲SynchronousQueue.offer
會立即退出。相反,SynchronousQueue.put
會阻塞,直到有人要求從隊列中取出一個項目,如果線程池爲空,則永遠不會發生。
因此,解決方法是強制線程池使用ThreadPoolExecutor.prestartAllCoreThreads
熱切地創建核心線程。我的問題變得相當微不足道。我做了我真正的用例的簡化版本:
import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
public class SimplifiedBuildScheduler {
private static final int MAX_POOL_SIZE = 10;
private static final Random random = new Random();
private static final AtomicLong nextTaskId = new AtomicLong(0);
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
// this is a soft requirement in my system, not a real-time guarantee. See the complete semantics in my question.
long maxBuildTimeInMillis = 50;
// this timeout must be small compared to maxBuildTimeInMillis in order to accurately match the maximum build time
long taskSubmissionTimeoutInMillis = 1;
ThreadPoolExecutor pool = new ThreadPoolExecutor(MAX_POOL_SIZE, MAX_POOL_SIZE, 0, SECONDS, queue);
pool.prestartAllCoreThreads();
Runnable nextTask = makeTask(maxBuildTimeInMillis);
long millisAtStart = System.currentTimeMillis();
while (maxBuildTimeInMillis > System.currentTimeMillis() - millisAtStart) {
boolean submitted = queue.offer(nextTask, taskSubmissionTimeoutInMillis, MILLISECONDS);
if (submitted) {
nextTask = makeTask(maxBuildTimeInMillis);
} else {
System.out.println("Task " + nextTaskId.get() + " was not submitted. " + "It will be rescheduled unless " +
"the max build time has expired");
}
}
System.out.println("Max build time has expired. Stop submitting new tasks and running existing tasks to completion");
pool.shutdown();
pool.awaitTermination(9999999, SECONDS);
}
private static Runnable makeTask(long maxBuildTimeInMillis) {
long sleepTimeInMillis = randomSleepTime(maxBuildTimeInMillis);
long taskId = nextTaskId.getAndIncrement();
return() -> {
try {
System.out.println("Task " + taskId + " sleeping for " + sleepTimeInMillis + " ms");
Thread.sleep(sleepTimeInMillis);
System.out.println("Task " + taskId + " completed !");
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
};
}
private static int randomSleepTime(long maxBuildTimeInMillis) {
// voluntarily make it possible that a task finishes after the max build time is expired
return 1 + random.nextInt(2 * Math.toIntExact(maxBuildTimeInMillis));
}
}
輸出的一個示例如下:
Task 1 was not submitted. It will be rescheduled unless the max build time has expired
Task 0 sleeping for 23 ms
Task 1 sleeping for 26 ms
Task 2 sleeping for 6 ms
Task 3 sleeping for 9 ms
Task 4 sleeping for 75 ms
Task 5 sleeping for 35 ms
Task 6 sleeping for 81 ms
Task 8 was not submitted. It will be rescheduled unless the max build time has expired
Task 8 was not submitted. It will be rescheduled unless the max build time has expired
Task 7 sleeping for 86 ms
Task 8 sleeping for 47 ms
Task 9 sleeping for 40 ms
Task 11 was not submitted. It will be rescheduled unless the max build time has expired
Task 2 completed !
Task 10 sleeping for 76 ms
Task 12 was not submitted. It will be rescheduled unless the max build time has expired
Task 3 completed !
Task 11 sleeping for 31 ms
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 0 completed !
Task 12 sleeping for 7 ms
Task 14 was not submitted. It will be rescheduled unless the max build time has expired
Task 14 was not submitted. It will be rescheduled unless the max build time has expired
Task 1 completed !
Task 13 sleeping for 40 ms
Task 15 was not submitted. It will be rescheduled unless the max build time has expired
Task 12 completed !
Task 14 sleeping for 93 ms
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 5 completed !
Task 15 sleeping for 20 ms
Task 17 was not submitted. It will be rescheduled unless the max build time has expired
Task 17 was not submitted. It will be rescheduled unless the max build time has expired
Task 11 completed !
Task 16 sleeping for 27 ms
Task 18 was not submitted. It will be rescheduled unless the max build time has expired
Task 18 was not submitted. It will be rescheduled unless the max build time has expired
Task 9 completed !
Task 17 sleeping for 95 ms
Task 19 was not submitted. It will be rescheduled unless the max build time has expired
Max build time has expired. Stop submitting new tasks and running existing tasks to completion
Task 8 completed !
Task 15 completed !
Task 13 completed !
Task 16 completed !
Task 4 completed !
Task 6 completed !
Task 10 completed !
Task 7 completed !
Task 14 completed !
Task 17 completed !
你會發現,例如,任務19不改期,因爲在調度程序可以嘗試再次將其提供給隊列之前,最大構建時間已過期。您還可以看到,在最大構建時間過期之前開始的所有正在進行的任務都將運行至完成。
注:正如我在代碼中的註釋指出,最大編譯時間是軟的要求,這意味着它可能無法準確地滿足,我的解決方案確實允許被連提交任務最大構建時間到期後。如果offer
的調用在最大編譯時間過期並在此之後結束之前啓動,則可能發生這種情況。爲了減少發生的可能性,調用offer
時使用的超時遠小於最大構建時間,這一點很重要。在真實的系統中,線程池通常沒有空閒線程,因此這種競爭條件發生的概率非常小,並且在系統發生時沒有不良後果,因爲最大構建時間是盡最大努力嘗試滿足整體運行時間,而不是嚴格而嚴格的約束。
是的,我想這會工作,使用睡眠有點骯髒,但它很簡單,我更喜歡複雜的額外邏輯。事情是,如果我能夠修復我們用當前方法發現的罕見邊緣案例(立即安排所有事務+在超時之後取消所有剩餘的任務),而不會引入更多複雜性,那麼我只會設法讓此更改得到接受。我會考慮你的解決方案,看看它如何適合我的代碼,謝謝。 – Dici
我遇到了這種方法的一個問題:作爲參數傳遞的'Runnable'與提交的參數不完全相同。相反,它被包裝在一個'FutureTask'中,它有一些異常處理邏輯,它可以阻止我自己的工作。你不能用我的問題給出的背景知道,但我將這添加爲未來讀者的警告。我可以通過強烈依賴「SynchronousQueue」的行爲來解決這個問題。我不能分享整個代碼。我會告訴你,如果我達到一個工作解決方案。 – Dici
我設法使它工作。這有點骯髒,但我很高興能夠在改善行爲的同時刪除現有代碼的25%(儘管我失去了一個關於錯誤處理的小功能)。我認爲我喜歡這一切:p。我會接受這個答案。 – Dici