我在Oozie中實現自定義異步操作時遇到問題。我的類從ActionExecutor擴展,並覆蓋方法initActionType,start,end,check,kill和isCompleted。Oozie自定義異步操作
在啓動方法中,我想啓動一個YARN作業,這是通過我的BiohadoopClient類實現的。要撥打電話異步的,我包裹在一個可調用的client.run()方法:
public void start(final Context context, final WorkflowAction action) {
...
Callable<String> biohadoop = new Callable<String>() {
BiohadoopClient client = new BiohadoopClient();
client.run();
}
// submit callable to executor
executor.submit(biohadoop);
// set the start data, according to https://oozie.apache.org/docs/4.0.1/DG_CustomActionExecutor.html
context.setStartData(externalId, callBackUrl, callBackUrl);
...
}
這工作得很好,例如,當我用我的自定義操作在一個叉/加入的方式,對執行動作並行運行。
現在,問題在於,Oozie對於此操作仍處於RUNNING狀態。似乎不可能將其改爲已完成的狀態。 Oozie永遠不會調用check()方法,對於end()方法也是如此。它無助於在Callable(在client.run()完成之後)手動設置context.setExternalStatus(),context.setExecutionData()和context.setEndData()。我也嘗試手動排隊ActionEndXCommand,但沒有運氣。
當我在Callable的start()方法中等待完成時,狀態得到正確更新,但fork/join中的執行不再平行(這似乎是邏輯,因爲執行等待Callable完成)。
How external clients notify Oozie workflow with HTTP callback沒有幫助,因爲使用回調似乎沒有任何變化(嗯,我可以看到它發生在日誌文件中,但除此之外,什麼都沒有......)。此外,答案中提到,SSH操作異步運行,但我還沒有發現這是如何完成的。 Callable中有一些包裝,但最後,Callable的call()方法被直接調用(不提交給Executor)。
到目前爲止,我還沒有找到任何示例如何編寫異步自定義操作。任何人都可以幫我嗎?
感謝
編輯
這裏是initActionType的實現(),啓動(),檢查(),()結束時,可調用的實現可以開始()動作中找到。
將可調用對象提交給start()操作中的執行程序,之後調用其shutdown()方法 - 以便執行程序在Callable完成後關閉。下一步,調用context.setStartData(externalId,callBackUrl,callBackUrl)。
private final AtomicBoolean finished = new AtomicBoolean(false);
public void initActionType() {
super.initActionType();
log.info("initActionType() invoked");
}
public void start(final Context context, final WorkflowAction action)
throws ActionExecutorException {
log.info("start() invoked");
// Get parameters from Node configuration
final String parameter = getParameters(action.getConf());
Callable<String> biohadoop = new Callable<String>() {
@Override
public String call() throws Exception {
log.info("Starting Biohadoop");
// No difference if check() is called manually
// or if the next line is commented out
check(context, action);
BiohadoopClient client = new BiohadoopClient();
client.run(parameter);
log.info("Biohadoop finished");
finished.set(true);
// No difference if check() is called manually
// or if the next line is commented out
check(context, action);
return null;
}
};
ExecutorService executor = Executors.newCachedThreadPool();
biohadoopResult = executor.submit(biohadoop);
executor.shutdown();
String externalId = action.getId();
String callBackUrl = context.getCallbackUrl("finished");
context.setStartData(externalId, callBackUrl, callBackUrl);
}
public void check(final Context context, final WorkflowAction action)
throws ActionExecutorException {
// finished is an AtomicBoolean, that is set to true,
// after Biohadoop has finished (see implementation of Callable)
if (finished.get()) {
log.info("check(Context, WorkflowAction) invoked -
Callable has finished");
context.setExternalStatus(Status.OK.toString());
context.setExecutionData(Status.OK.toString(), null);
} else {
log.info("check(Context, WorkflowAction) invoked");
context.setExternalStatus(Status.RUNNING.toString());
}
}
public void end(Context context, WorkflowAction action)
throws ActionExecutorException {
log.info("end(Context, WorkflowAction) invoked");
context.setEndData(Status.OK, Status.OK.toString());
}
您能說明如何實現check()和initActionType()方法,以及如何在Callable中實現call()方法? – 2014-10-03 19:25:22
@SSaikia_JtheRocker:我添加了實現 – gappc 2014-10-03 21:37:00