2013-10-02 67 views
3

我想在Java中實現一個簡單的DAG狀調度(不需要結果),被描述爲下面的圖:如何在Java中實現類DAG調度程序?

DAG-like scheduling

我可以簡單地使用手冊代碼來實現這一點:

ExecutorService executor = Executors.newCachedThreadPool(); 
Future<?> futureA = executor.submit(new Task("A")); 
Future<?> futureC = executor.submit(new Task("C")); 
futureA.get(); 
Future<?> futureB = executor.submit(new Task("B")); 
futureB.get(); 
futureC.get(); 
Future<?> futureD = executor.submit(new Task("D")); 
futureD.get(); 

但是我正在尋找一種更普遍的方式來做到這一點,所以我可以使用這樣的調度:

Container container = new Container(); 
container.addTask("A", new Task("A")); 
container.addTask("B", new Task("B"), "A"); 
container.addTask("C", new Task("C")); 
container.addTask("D", new Task("D"), "B", "C"); 
container.waitForCompletion(); 

一ND其實我已經實現一個簡單的一個:

https://github.com/jizhang/micro-scheduler/blob/master/src/main/java/com/shzhangji/micro_scheduler/App.java

但我需要遍歷所有的任務,每100ms,看看哪一個是準備提交。在這個實現中也沒有異常檢查。

我也檢出Guava lib的ListenableFuture,但我不知道如何正確使用它。

有關如何實施DAG或推薦現有開源調度程序的任何建議,敬請期待。

+0

我只是想同時運行任務'A','B'和'C',然後等待他們完成爲什麼不使用'invokeAll'。 –

+1

@BoristheSpider'B'依賴於'A',並且可能有更復雜的圖。 –

+0

這是一個非線性管道。查找管道模式,有幾個實現它的java庫。 – tom

回答

4

你在找什麼可以使用谷歌的番石榴庫,它是可聽的未來接口。 ListenableFutures使您可以擁有複雜的異步操作鏈。一旦使用allAsList方法完成了任務B和C,您應該實現一個可監聽的將來來執行任務D.

用於收聽期貨的文檔: https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained

上收聽到期貨的指南: http://www.javacodegeeks.com/2013/02/listenablefuture-in-guava.html

使用allAsList,鏈的一個例子,和變換方法: http://codingjunkie.net/google-guava-futures/

+1

感謝您的文章,我寫這樣一個:https://github.com/jizhang/micro-scheduler/blob/master/src/main/java/com/shzhangji/micro_scheduler/Container.java,它的工作原理。 –

3

Dexecutor(聲明:我是店主)是你正在尋找的圖書館,下面是一個例子

public class WorkFlowManager { 

private final Dexecutor<String, Boolean> dexecutor; 

public WorkFlowManager(ExecutorService executorService) { 
    this.dexecutor = buildDexecutor(executorService); 

    buildGraph(); 
} 

private Dexecutor<String, Boolean> buildDexecutor(final ExecutorService executorService) { 
    DexecutorConfig<String, Boolean> config = new DexecutorConfig<>(executorService, new WorkFlowTaskProvider()); 
    return new DefaultDexecutor<>(config); 
} 

private void buildGraph() { 
    this.dexecutor.addDependency(TaskOne.NAME, TaskTwo.NAME); 
    this.dexecutor.addDependency(TaskTwo.NAME, TaskThree.NAME); 
    this.dexecutor.addDependency(TaskTwo.NAME, TaskFour.NAME); 
    this.dexecutor.addDependency(TaskTwo.NAME, TaskFive.NAME); 
    this.dexecutor.addDependency(TaskFive.NAME, TaskSix.NAME); 
    this.dexecutor.addAsDependentOnAllLeafNodes(TaskSeven.NAME); 
} 

public void execute() { 
    this.dexecutor.execute(ExecutionConfig.TERMINATING); 
} 
} 

這將構建以下圖形,並且執行將相應地進行。 Dexecutor Graph

參考How Do I?更多細節

爲什麼Dexecutor

  • 超輕重量
  • 超快
  • 即時/計劃重試邏輯支持
  • 非終端行爲支持
  • 條件跳過任務執行
  • 良好的測試覆蓋率,讓你免受傷害
  • 可在文檔
  • 分發執行支持的行家中央
  • 良好的金額(點燃,Hazelcast,Infinispan的)

有用的鏈接

0

另一種可能的方向是JavaRed Library其提供用於寫入的接口,並限定異步圖形與同步狀成語流動。

作爲一個簡單的例子,因爲這一個圖形流程複雜: Graph execution flow

可以簡單地實現:

Result<String> aResult = produceFutureOf(String.class).byExecuting(() -> executeA()); 
Result<String> bResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeB(a)); 
Result<String> cResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeC(a)); 
Result<String> eResult = ifResult(aResult).succeed().produceFutureOf(String.class).byExecuting(a -> executeE(a)); 
Result<String> dResult = ifResults(bResult, cResult).succeed().produceFutureOf(String.class).byExecuting((b, c) -> executeD(b, c)); 
Result<String> fResult = ifResult(eResult).succeed().produceFutureOf(String.class).byExecuting(e -> executeF(e)); 
Result<String> gResult = ifResult(fResult).succeed().produceFutureOf(String.class).byExecuting(f -> executeG(f)); 
return ifResults(dResult, gResult).succeed().produceFutureOf(String.class).byExecuting((d, g) -> executeH(d, g)); 

更多關於它的project wiki