A 線程不足死鎖發生在普通線程池中,如果池中的所有線程正在等待同一個池中的排隊任務完成。 ForkJoinPool
通過從join()
調用中的其他線程竊取工作而不是簡單地等待來避免此問題。例如:我可以使用ForkJoinPool的工作竊取行爲來避免線程匱乏死鎖嗎?
private static class ForkableTask extends RecursiveTask<Integer> {
private final CyclicBarrier barrier;
ForkableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
protected Integer compute() {
try {
barrier.await();
return 1;
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void testForkJoinPool() throws Exception {
final int parallelism = 4;
final ForkJoinPool pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
for (int i = 0; i < parallelism; ++i) {
forkableTasks.add(new ForkableTask(barrier));
}
int result = pool.invoke(new RecursiveTask<Integer>() {
@Override
protected Integer compute() {
for (ForkableTask task : forkableTasks) {
task.fork();
}
int result = 0;
for (ForkableTask task : forkableTasks) {
result += task.join();
}
return result;
}
});
assertThat(result, equalTo(parallelism));
}
但是用ExecutorService
接口到ForkJoinPool
,工作竊取時,似乎沒有發生。例如:
private static class CallableTask implements Callable<Integer> {
private final CyclicBarrier barrier;
CallableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public Integer call() throws Exception {
barrier.await();
return 1;
}
}
@Test
public void testWorkStealing() throws Exception {
final int parallelism = 4;
final ExecutorService pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
int result = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
// Deadlock in invokeAll(), rather than stealing work
for (Future<Integer> future : pool.invokeAll(callableTasks)) {
result += future.get();
}
return result;
}
}).get();
assertThat(result, equalTo(parallelism));
}
從ForkJoinPool
的實施粗粗一看,所有的常規ExecutorService
API的使用ForkJoinTask
小號來實現,那麼,爲什麼發生死鎖我不知道。
我不認爲偷工作可以避免死鎖。一旦你陷入僵局,就無法取得進展。竊取工作只是通過允許線程在其隊列爲空時從其他隊列中竊取來避免不平衡隊列。 – markspace 2014-10-26 18:14:00
@markspace在'ForkJoinTask'的實現中,'join()'嘗試從deque運行其他作業而不是拖延,這可以避免死鎖。由於'ForkJoinPool.invokeAll()'將'Callable's轉換爲'ForkJoinTask's,我預計它也能工作。 – 2014-10-27 13:53:00