2
A
回答
2
您可以在運行應用程序之前添加退出條件。 例如
public void testMapOperator() throws Exception
{
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
FunctionOperator.MapFunctionOperator<Integer, Integer> mapper
= dag.addOperator("mapper", new FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square()));
ResultCollector collector = dag.addOperator("collector", new ResultCollector());
dag.addStream("raw numbers", numGen.output, mapper.input);
dag.addStream("mapped results", mapper.output, collector.input);
// Create local cluster
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
//Condition to exit the application
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return TupleCount == NumTuples;
}
});
lc.run();
Assert.assertEquals(sum, 285);
}
4
什麼是您的使用情況?本地支持批處理是在路線圖上,目前正在開展工作。或者,直到那時,一旦確定處理完成,輸入操作員可以發送ShutdownException()信號,並通過DAG傳播並關閉DAG。
讓我們知道您是否需要進一步的細節。
+0
我正在寫幾乎所有開源bigdata處理引擎的比較作爲我的msc論文。我想創建一個頂點批處理部分(與Mapreduce,Flink和Spark一致)。我現在可能會跳過它,繼續進行流比較。 – Krever
+0
當然。使用它的方式是:在你的endWindow()調用中,檢查你的任務是否完成 - 需要一些自定義邏輯。如果你的任務已經完成,調用ShuddownException()並且你的整個管道將關閉。 –
相關問題
- 1. 如何在MSBuild中進行批處理?
- 2. 如何使用批處理
- 3. 如何使用批處理
- 4. 如何使用批處理
- 5. 如何使用批處理
- 6. 如何使用批處理
- 7. 如何使用批處理
- 8. 如何使用DynamoDBContext對結果進行批處理
- 9. Tensorflow-如何使用MNIST數據集進行全批處理?
- 10. 如何運行批處理?
- 11. 如何運行批處理批
- 12. 使用批處理程序進行地理編碼
- 13. 如何從批處理腳本中運行批處理腳本?
- 14. 使用批處理多進程
- 15. Apex批處理不執行所有記錄
- 16. 批處理類Apex中的執行混亂
- 17. 使用進程和運行時類執行批處理文件
- 18. 使用批處理
- 19. 使用批處理
- 20. 使用批處理
- 21. 使用批處理
- 22. 使用批處理
- 23. 使用批處理
- 24. 使用批處理
- 25. 使用批處理
- 26. 用參數執行批處理使用進程
- 27. 如何使用Hibernate批處理
- 28. 如何使用Windows批處理
- 29. 如何使用webgl實現批處理?
- 30. 如何使用tf.cond批處理
在運行環境方面,一些更一般的解決方案如何?我想有可能選擇是本地還是集羣環境。 – Krever