2011-07-26 27 views
14

This question在兩年前被問到,但它提及的資源要麼不是非常有用(恕我直言)或鏈接不再有效。Java:jsr166y的教程/解釋Phaser

必須有一些很好的教程來了解Phaser。我已經閱讀了javadoc,但是我的目光掠過,因爲爲了真正理解javadoc,你必須知道這些類應該如何使用。

任何人有任何建議嗎?

+1

太陽/ Oracle的[教程](http://download.oracle.com/javase/tutorial/essential/併發/ forkjoin.html)總是很不錯。 – toto2

回答

43

對於Phaser我已經回答了幾個問題。看到他們可能有助於理解他們的應用程序。它們在底部相連。但要了解Phaser的工作原理以及爲什麼它的有用性對於瞭解它的解決方案至關重要。

這裏是的CountDownLatch的屬性和的CyclicBarrier

注:

  • 締約方數量是說不同的線程#的另一種方式
  • 不能重用意味着你必須創建一個新的實例 重新使用前的屏障
  • 如果線程可以到達並繼續工作,則障礙可以提前 而不必等待其他人或可以等待所有人線程完成

CountdownLatch

  • 固定數量的參與者
  • 沒有可重用的
  • 推進的(看latch.countDown();推進的 latch.await();必須等待

的CyclicBarrier

  • 定員
  • 沒有推進的

所以CountdownLatch不能重複使用,必須創建一個新的實例每次雙方

  • 可重複使用的,但是是可行的。 CylicBarrier可以重新使用,但所有線程必須等待每一方到達屏障。

    移相

    • 各方
    • 可重複使用的動態數
    • 推進的

    當一個線程想知道的移相器,他們援引phaser.register()當線程到達在他們調用的屏障phaser.arrive()這裏是它可以提前。如果線程想要等待所有註冊任務完成phaser.arriveAndAwaitAdvance()

    還有一個階段的概念,在這個階段中線程可以等待完成其他可能尚未完成的操作。一旦所有線程到達相位器的屏障,就會創建一個新的階段(增量爲1)。

    你可以在我的其他答案看看,也許它會幫助:

    Java ExecutorService: awaitTermination of all recursively created tasks

    Flexible CountDownLatch?

  • 5

    對於Phaser至少,我認爲的JavaDoc提供了一個相當明確的解釋。這是您可以用來同步批處理線程的類,因爲您可以將批處理中的每個線程與Phaser一起註冊,然後使用Phaser使其阻塞,直到批處理中的每個線程都已通知Phaser,此時任何阻塞的線程將開始執行。根據需要/需要,該等待/通知週期可以一遍又一遍地重複。

    他們的示例代碼給出了一個合理的例子(雖然我很討厭他們的2個字符的縮進風格):

    void runTasks(List<Runnable> tasks) { 
        final Phaser phaser = new Phaser(1); // "1" to register self 
        // create and start threads 
        for (final Runnable task : tasks) { 
        phaser.register(); 
        new Thread() { 
         public void run() { 
         phaser.arriveAndAwaitAdvance(); // await all creation 
         task.run(); 
         } 
        }.start(); 
        } 
    
        // allow threads to start and deregister self 
        phaser.arriveAndDeregister(); 
    } 
    

    這就建立了一個Phasertasks.size() + 1註冊計數,併爲每個任務創建新的Thread將阻止,直到Phaser的下一個提前(即已記錄tasks.size() + 1到達的時間),然後運行其相關任務。創建的每個Thread也立即啓動,因此Phasertasks.size()記錄的循環中跳出。

    最終致電phaser.arriveAndDeregister()將記錄最終到達,並且還減少登記計數,使其現在等於tasks.size()。這會導致Phaser前進,這實際上允許所有任務在同一時間開始運行。這可以通過執行以下操作來重複:

    void runTasks(List<Runnable> tasks) { 
        final Phaser phaser = new Phaser(1); // "1" to register self 
        // create and start threads 
        for (final Runnable task : tasks) { 
        phaser.register(); 
        new Thread() { 
         public void run() { 
         while (true) { 
          phaser.arriveAndAwaitAdvance(); // await all creation 
          task.run(); 
         } 
         } 
        }.start(); 
        } 
    
        // allow threads to start and deregister self 
        phaser.arriveAndDeregister(); 
    } 
    

    ...除了添加導致任務重複運行的循環之外,它和以前一樣。因爲每次迭代調用phaser.arriveAndAwaitAdvance(),任務線程的執行將被同步,使得任務0不會開始其第二次迭代,直到每個其他任務完成其第一次迭代並且通知Phaser已準備好開始其第二次迭代。

    如果您正在運行的任務在執行時間上差異很大,並且您希望確保更快的線程不會與較慢的線程不同步,這可能會非常有用。

    對於一個可能的真實世界的應用程序,考慮運行單獨的圖形和物理線程的遊戲。如果圖形線程卡在第6幀上,並且使用Phaser是確保圖形和物理線程始終在第一幀的同一幀上工作的一種可能方法,則不希望物理線程爲第100幀計算數據同時(並且如果一個線程比另一個線程慢得多,則更快的線程正常地產生CPU資源,以便希望較慢的線程可以更快地趕上)。

    +0

    你的第二個代碼清單和第一個一樣... –

    +0

    @Jason - 不是,看起來更仔細。第二個代碼清單在'Thread'內部有一個'while(true){}'循環,緊跟在'public void run()'之後。 – aroth

    +0

    啊。得到它了。如果你引用了javadoc,你應該在處理終止時使用他們的代碼。 –

    1

    Phaser在CyclicBarrier和CountDownLatch的功能上有些類似,但它比兩者都提供了更大的靈活性。

    在CyclicBarrier中,我們使用在構造函數中註冊參與方,但Phaser爲我們提供了隨時註冊和取消註冊的靈活性。 對於登記方,我們可以使用以下任一 -

    • 構造
    • 寄存器或
    • bulkRegister

    對於註銷方,我們可以使用 -

    • arriandDeregister或


    登記冊 再添/註冊新unarrived參加本移相器。它返回

    • 本註冊適用的到達階段編號。
    • 如果移相器已終止,則值爲負數,並且註冊無效。

    如果正在進行onAdvance()方法的調用,那麼返回此方法可能會等待其完成。 如果該移相器有一位父母,並且沒有使用該移相器的註冊方,則該孩子移相器也向其父母註冊。
    程序來演示移相器的使用>

    import java.util.concurrent.Phaser; 
    
    public class PhaserTest { 
    public static void main(String[] args) { 
    
         /*Creates a new phaser with 1 registered unArrived parties 
         * and initial phase number is 0 
         */ 
         Phaser phaser=new Phaser(1); 
         System.out.println("new phaser with 1 registered unArrived parties" 
            + " created and initial phase number is 0 "); 
    
         //Create 3 threads 
         Thread thread1=new Thread(new MyRunnable(phaser,"first"),"Thread-1"); 
         Thread thread2=new Thread(new MyRunnable(phaser,"second"),"Thread-2"); 
         Thread thread3=new Thread(new MyRunnable(phaser,"third"),"Thread-3"); 
    
         System.out.println("\n--------Phaser has started---------------"); 
         //Start 3 threads 
         thread1.start(); 
         thread2.start(); 
         thread3.start(); 
    
         //get current phase 
         int currentPhase=phaser.getPhase(); 
         /*arriveAndAwaitAdvance() will cause thread to wait until current phase 
         * has been completed i.e. until all registered threads 
         * call arriveAndAwaitAdvance() 
         */ 
         phaser.arriveAndAwaitAdvance(); 
         System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------"); 
    
         //------NEXT PHASE BEGINS------ 
    
         currentPhase=phaser.getPhase(); 
         phaser.arriveAndAwaitAdvance(); 
         System.out.println("------Phase-"+currentPhase+" has been COMPLETED----------"); 
    
         /* current thread Arrives and deRegisters from phaser. 
         * DeRegistration reduces the number of parties that may 
         * be required to advance in future phases. 
         */ 
         phaser.arriveAndDeregister(); 
    
         //check whether phaser has been terminated or not. 
         if(phaser.isTerminated()) 
           System.out.println("\nPhaser has been terminated"); 
    
    } 
    } 
    
    
    
    
    
    class MyRunnable implements Runnable{ 
    
    Phaser phaser; 
    
    MyRunnable(Phaser phaser,String name){ 
         this.phaser=phaser; 
         this.phaser.register(); //Registers/Add a new unArrived party to this phaser. 
         System.out.println(name +" - New unarrived party has " 
            + "been registered with phaser"); 
    } 
    
    @Override 
    public void run() { 
         System.out.println(Thread.currentThread().getName() + 
            " - party has arrived and is working in " 
            + "Phase-"+phaser.getPhase()); 
         phaser.arriveAndAwaitAdvance(); 
    
         //Sleep has been used for formatting output 
         try { 
           Thread.sleep(1000); 
         } catch (InterruptedException e) { 
           e.printStackTrace(); 
         } 
    
         //------NEXT PHASE BEGINS------ 
    
         System.out.println(Thread.currentThread().getName() + 
            " - party has arrived and is working in " 
            + "Phase-"+phaser.getPhase()); 
         phaser.arriveAndAwaitAdvance(); 
    
         phaser.arriveAndDeregister(); 
    } 
    

    }


    bulkRegister - 再添各方一批新unarrived各方對這個移相器。它返回

    • 本註冊適用的到達階段編號。
    • 如果移相器已終止,則值爲負數,並且註冊無效。

    如果正在進行onAdvance()方法的調用,那麼返回此方法可能會等待其完成。

    arrivalandderegister- 當前線程(Party)從移相器到達和取消註冊。取消註冊減少了將來可能需要轉入下一階段的參與方數量。

    如果這位移相器有一位父母,並且沒有使用此移相器的註冊方,此孩子移相器也向其父母註冊。 計劃,以證明父母子女移相

    import java.util.concurrent.Phaser; 
    
    public class PhaserParentChildTest { 
    public static void main(String[] args) { 
    
    
        /* 
        * Creates a new phaser with no registered unArrived parties. 
        */ 
        Phaser parentPhaser = new Phaser(); 
    
        /* 
        * Creates a new phaser with the given parent & 
        * no registered unArrived parties. 
        */ 
        Phaser childPhaser = new Phaser(parentPhaser,0); 
    
        childPhaser.register(); 
    
        System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated()); 
        System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated()); 
    
        childPhaser.arriveAndDeregister(); 
        System.out.println("\n--childPhaser has called arriveAndDeregister()-- \n"); 
    
        System.out.println("parentPhaser isTerminated : "+parentPhaser.isTerminated()); 
        System.out.println("childPhaser isTerminated : "+childPhaser.isTerminated()); 
    
    } 
    } 
    

    瞭解更多關於移相器上http://www.javamadesoeasy.com/2015/03/phaser-in-java_21.html