11

A 線程不足死鎖發生在普通線程池中,如果池中的所有線程正在等待同一個池中的排隊任務完成。 ForkJoinPool通過從join()調用中的其他線程竊取工作而不是簡單地等待來避免此問題。例如:我可以使用ForkJoinPool的工作竊取行爲來避免線程匱乏死鎖嗎?

private static class ForkableTask extends RecursiveTask<Integer> { 
    private final CyclicBarrier barrier; 

    ForkableTask(CyclicBarrier barrier) { 
     this.barrier = barrier; 
    } 

    @Override 
    protected Integer compute() { 
     try { 
      barrier.await(); 
      return 1; 
     } catch (InterruptedException | BrokenBarrierException e) { 
      throw new RuntimeException(e); 
     } 
    } 
} 

@Test 
public void testForkJoinPool() throws Exception { 
    final int parallelism = 4; 
    final ForkJoinPool pool = new ForkJoinPool(parallelism); 
    final CyclicBarrier barrier = new CyclicBarrier(parallelism); 

    final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism); 
    for (int i = 0; i < parallelism; ++i) { 
     forkableTasks.add(new ForkableTask(barrier)); 
    } 

    int result = pool.invoke(new RecursiveTask<Integer>() { 
     @Override 
     protected Integer compute() { 
      for (ForkableTask task : forkableTasks) { 
       task.fork(); 
      } 

      int result = 0; 
      for (ForkableTask task : forkableTasks) { 
       result += task.join(); 
      } 
      return result; 
     } 
    }); 
    assertThat(result, equalTo(parallelism)); 
} 

但是用ExecutorService接口到ForkJoinPool,工作竊取時,似乎沒有發生。例如:

private static class CallableTask implements Callable<Integer> { 
    private final CyclicBarrier barrier; 

    CallableTask(CyclicBarrier barrier) { 
     this.barrier = barrier; 
    } 

    @Override 
    public Integer call() throws Exception { 
     barrier.await(); 
     return 1; 
    } 
} 

@Test 
public void testWorkStealing() throws Exception { 
    final int parallelism = 4; 
    final ExecutorService pool = new ForkJoinPool(parallelism); 
    final CyclicBarrier barrier = new CyclicBarrier(parallelism); 

    final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier)); 
    int result = pool.submit(new Callable<Integer>() { 
     @Override 
     public Integer call() throws Exception { 
      int result = 0; 
      // Deadlock in invokeAll(), rather than stealing work 
      for (Future<Integer> future : pool.invokeAll(callableTasks)) { 
       result += future.get(); 
      } 
      return result; 
     } 
    }).get(); 
    assertThat(result, equalTo(parallelism)); 
} 

ForkJoinPool的實施粗粗一看,所有的常規ExecutorService API的使用ForkJoinTask小號來實現,那麼,爲什麼發生死鎖我不知道。

+2

我不認爲偷工作可以避免死鎖。一旦你陷入僵局,就無法取得進展。竊取工作只是通過允許線程在其隊列爲空時從其他隊列中竊取來避免不平衡隊列。 – markspace 2014-10-26 18:14:00

+0

@markspace在'ForkJoinTask'的實現中,'join()'嘗試從deque運行其他作業而不是拖延,這可以避免死鎖。由於'ForkJoinPool.invokeAll()'將'Callable's轉換爲'ForkJoinTask's,我預計它也能工作。 – 2014-10-27 13:53:00

回答

20

你差不多回答你自己的問題。解決方法是「ForkJoinPool通過從join()調用內部的其他線程竊取工作來避免此問題」。每當線程由於除ForkJoinPool.join()之外的其他原因而被阻塞時,不會發生此工作竊取,並且線程僅等待並且什麼都不做。

原因是在Java中,ForkJoinPool不可能阻止其線程阻塞,而是給他們另外的工作。線程本身需要避免阻塞,而是要求池應該做的工作。這隻在ForkJoinTask.join()方法中實現,而不是在其他任何阻塞方法中實現。如果在ForkJoinPool中使用Future,則還會看到飢餓死鎖。

爲什麼工作竊取僅在ForkJoinTask.join()中實現,而不是在Java API中的任何其他阻止方法中實現?那麼,有很多這樣的阻塞方法(Object.wait(),Future.get()java.util.concurrent中的任何併發原語,I/O方法等),它們與ForkJoinPool無關,它只是API中的一個任意類,所以添加所有這些方法的特例都是不好的設計。這也會導致可能非常令人驚訝和不期望的影響。想象一下,例如,用戶將任務傳遞給ExecutorService,等待Future,然後發現該任務在Future.get()中掛起的時間很長,這是因爲正在運行的線程偷了其他一些(長時間運行的)工作項而不是等待Future並在結果可用後立即繼續。一旦一個線程開始處理另一個任務,它將無法返回到原始任務,直到第二個任務完成。因此,其他阻止方法不會偷盜工作實際上是一件好事。對於ForkJoinTask,這個問題並不存在,因爲重要的是儘快完成主要任務,所有任務一起儘可能有效地處理是非常重要的。

ForkJoinPool內偷工作也不可能實現你自己的方法,因爲所有相關部分都不公開。

但是,實際上有第二種方法可以防止飢餓死鎖。這被稱爲管理阻止。它不使用工作竊取(以避免上述問題),而且還需要將要阻塞的線程積極配合線程池。通過管理阻塞,線程通知線程池它可能被阻塞之前它調用潛在的阻塞方法,並且在阻塞方法完成時通知池。線程池然後知道有一個飢餓死鎖的風險,並且如果其所有線程當前處於某個阻塞操作並且還有其他任務要執行,則可能會產生其他線程。請注意,由於額外線程的開銷,這比盜取工作效率低。如果你實現了一個具有普通期貨和管理式阻塞的遞歸併行算法,而不是ForkJoinTask和偷工作,附加線程的數量可能會變得非常大(因爲在算法的「分裂」階段,將創建大量任務並且給予立即阻塞並等待子任務結果的線程)。然而,飢餓死鎖仍然被阻止,並且它避免了任務由於其線程同時開始另一個任務而必須等待很長時間的問題。

Java的ForkJoinPool也支持管理阻塞。爲了使用它,需要實現接口ForkJoinPool.ManagedBlocker,以便任務想要執行的潛在阻塞方法從此接口的block方法中調用。然後該任務可能不會直接調用阻塞方法,而是需要調用靜態方法ForkJoinPool.managedBlock(ManagedBlocker)。該方法在阻塞之前和之後處理與線程池的通信。如果當前任務沒有在ForkJoinPool內執行,它也可以工作,然後它只是調用阻塞方法。

我在Java API(用於Java 7)中發現的實際使用託管阻塞的唯一地方是類Phaser。 (這個類是像互斥鎖和鎖存器這樣的同步障礙,但更靈活和更強大。)因此,與ForkJoinPool任務中的Phaser同步應該使用受管阻塞,並且可以避免捱餓死鎖(但ForkJoinTask.join()仍然是可取的,因爲它使用工作竊取而不是管理阻止)。無論您是直接使用ForkJoinPool還是通過ExecutorService接口,這都可以工作。但是,如果您使用類別Executors創建的其他ExecutorService,則它不起作用,因爲它們不支持受管理阻止。

在斯卡拉,受管阻塞的使用更廣泛(description,API)。

+2

感謝您的回答,非常全面。儘管一個挑剔,'ForkJoinTask'實現在'get()'中執行與在join()中執行相同的操作。我的問題中的死鎖主要來自嘗試同步沒有'ForkJoinPool.managedBlock()'(實際上,這兩個例子在Java 7上的死鎖)。改用'Phaser's,兩者都可以工作。 – 2014-10-28 21:29:13

+0

也許你可以澄清[Java的線程的阻塞狀態](http://docs.oracle.com/javase/7/docs/api/java/lang/Thread.State.html)如何對應於現代中的阻塞線程斯卡拉感。每個阻塞操作是否按照該鏈接上描述的方式等待Java監視器鎖定?或者線程池是否以其他方式跟蹤阻塞狀態? – matanster 2015-03-16 10:11:18

+0

@matt我不確定你的意思是「阻塞現代斯卡拉意義上的線程」。 State.BLOCKED'和管理阻塞之間'的連接僅僅是一個知道很快,這可能是在BLOCKED狀態的線程應該調用'managedBlock'告訴ForkJoinPool這個前期。 ForkJoinPool不會檢測線程當前是否被阻塞。如果一個線程使用'managedBlock'但實際上並未被阻塞,那麼ForkJoinPool仍然會增加線程的數量。 – 2015-03-16 16:04:06

0

我明白你在做什麼,但我不知道爲什麼。屏障的思想是獨立的線程可以等待對方達到共同點。你沒有獨立的線程。線程池,F/J,是Data Parallelism

你正在做的事情更加切合Task Parallelism

原因F /Ĵ繼續是框架創建「延續線程」繼續從雙端獲取工作的時候所有工作線程正在等待。

+0

障礙只是爲了確保每個任務都安排在單獨的線程上。我不認爲「延續線程」是答案,如果你打印出'Thread.currentThread()。getId()',你會看到其中一個'ForkableTasks'運行在與之相同的線程中調用剩下的部分,並且總共只使用4個線程。 – 2014-10-26 22:35:16

+0

您不能保證哪個線程處理盜用工作的任務。所有任務轉儲到同一個提交隊列中。取決於您使用哪個版本(Java7/8),將該塊的工作線程替換爲「繼續」或「補償」線程。你所做的並不是F/J(數據並行性)的優點。 – edharned 2014-10-27 14:26:15

0

當使用線程池的線程和任務數量有限(例如通過future.get())時,總會有線程匱乏的可能性。可以使用無限制的線程池(並準備好OutOfMemoryError),或者通過將阻塞任務分解爲解鎖部件,並在滿足所需條件時激活,使用非阻塞任務。 Future類不能執行此操作(激活任務),但Java8中的CompletableFuture可以。還可以看到許多演員和數據流庫,例如我df4j2

+0

傳統的線程池確實如此。但'ForkJoinPool'實現'ForkJoinTask'上的'ExecutorService' API,所以我期望它的行爲匹配。 – 2014-10-27 13:47:29