2016-11-28 61 views
2

如何使用Apache Apex創建批處理應用程序?如何使用Apex進行批處理?

我發現的所有示例都是流式應用程序,這意味着它們不會結束,我希望我的應用程序在處理完所有數據後關閉它。

謝謝

回答

2

您可以在運行應用程序之前添加退出條件。 例如

public void testMapOperator() throws Exception 
{ 
    LocalMode lma = LocalMode.newInstance(); 
    DAG dag = lma.getDAG(); 

    NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator()); 
    FunctionOperator.MapFunctionOperator<Integer, Integer> mapper 
    = dag.addOperator("mapper", new FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square())); 
    ResultCollector collector = dag.addOperator("collector", new ResultCollector()); 

    dag.addStream("raw numbers", numGen.output, mapper.input); 
    dag.addStream("mapped results", mapper.output, collector.input); 

// Create local cluster 
    LocalMode.Controller lc = lma.getController(); 
    lc.setHeartbeatMonitoringEnabled(false); 

//Condition to exit the application 
    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() 
    { 
    @Override 
    public Boolean call() throws Exception 
    { 
     return TupleCount == NumTuples; 
    } 
    }); 

    lc.run(); 

    Assert.assertEquals(sum, 285); 
} 

完整的代碼參照https://github.com/apache/apex-malhar/blob/master/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java

+0

在運行環境方面,一些更一般的解決方案如何?我想有可能選擇是本地還是集羣環境。 – Krever

4

什麼是您的使用情況?本地支持批處理是在路線圖上,目前正在開展工作。或者,直到那時,一旦確定處理完成,輸入操作員可以發送ShutdownException()信號,並通過DAG傳播並關閉DAG。

讓我們知道您是否需要進一步的細節。

+0

我正在寫幾乎所有開源bigdata處理引擎的比較作爲我的msc論文。我想創建一個頂點批處理部分(與Mapreduce,Flink和Spark一致)。我現在可能會跳過它,繼續進行流比較。 – Krever

+0

當然。使用它的方式是:在你的endWindow()調用中,檢查你的任務是否完成 - 需要一些自定義邏輯。如果你的任務已經完成,調用ShuddownException()並且你的整個管道將關閉。 –