2014-10-03 57 views
1

我在Oozie中實現自定義異步操作時遇到問題。我的類從ActionExecutor擴展,並覆蓋方法initActionType,start,end,check,kill和isCompleted。Oozie自定義異步操作

在啓動方法中,我想啓動一個YARN作業,這是通過我的BiohadoopClient類實現的。要撥打電話異步的,我包裹在一個可調用的client.run()方法:

public void start(final Context context, final WorkflowAction action) { 
... 
    Callable<String> biohadoop = new Callable<String>() { 
    BiohadoopClient client = new BiohadoopClient(); 
    client.run(); 
    } 

    // submit callable to executor 
    executor.submit(biohadoop); 

    // set the start data, according to https://oozie.apache.org/docs/4.0.1/DG_CustomActionExecutor.html 
    context.setStartData(externalId, callBackUrl, callBackUrl); 
... 
} 

這工作得很好,例如,當我用我的自定義操作在一個叉/加入的方式,對執行動作並行運行。

現在,問題在於,Oozie對於此操作仍處於RUNNING狀態。似乎不可能將其改爲已完成的狀態。 Oozie永遠不會調用check()方法,對於end()方法也是如此。它無助於在Callable(在client.run()完成之後)手動設置context.setExternalStatus(),context.setExecutionData()和context.setEndData()。我也嘗試手動排隊ActionEndXCommand,但沒有運氣。

當我在Callable的start()方法中等待完成時,狀態得到正確更新,但fork/join中的執行不再平行(這似乎是邏輯,因爲執行等待Callable完成)。

How external clients notify Oozie workflow with HTTP callback沒有幫助,因爲使用回調似乎沒有任何變化(嗯,我可以看到它發生在日誌文件中,但除此之外,什麼都沒有......)。此外,答案中提到,SSH操作異步運行,但我還沒有發現這是如何完成的。 Callable中有一些包裝,但最後,Callable的call()方法被直接調用(不提交給Executor)。

到目前爲止,我還沒有找到任何示例如何編寫異步自定義操作。任何人都可以幫我嗎?

感謝

編輯

這裏是initActionType的實現(),啓動(),檢查(),()結束時,可調用的實現可以開始()動作中找到。

將可調用對象提交給start()操作中的執行程序,之後調用其shutdown()方法 - 以便執行程序在Callable完成後關閉。下一步,調用context.setStartData(externalId,callBackUrl,callBackUrl)。

private final AtomicBoolean finished = new AtomicBoolean(false); 

public void initActionType() { 
    super.initActionType(); 
    log.info("initActionType() invoked"); 
} 

public void start(final Context context, final WorkflowAction action) 
     throws ActionExecutorException { 
    log.info("start() invoked"); 

    // Get parameters from Node configuration 
    final String parameter = getParameters(action.getConf()); 

    Callable<String> biohadoop = new Callable<String>() { 
     @Override 
     public String call() throws Exception { 
      log.info("Starting Biohadoop"); 

      // No difference if check() is called manually 
      // or if the next line is commented out 
      check(context, action); 

      BiohadoopClient client = new BiohadoopClient(); 
      client.run(parameter); 
      log.info("Biohadoop finished");    

      finished.set(true); 
      // No difference if check() is called manually 
      // or if the next line is commented out 
      check(context, action); 

      return null; 
     } 
    }; 

    ExecutorService executor = Executors.newCachedThreadPool(); 
    biohadoopResult = executor.submit(biohadoop); 
    executor.shutdown(); 

    String externalId = action.getId(); 
    String callBackUrl = context.getCallbackUrl("finished"); 
    context.setStartData(externalId, callBackUrl, callBackUrl); 
} 

public void check(final Context context, final WorkflowAction action) 
     throws ActionExecutorException { 
    // finished is an AtomicBoolean, that is set to true, 
    // after Biohadoop has finished (see implementation of Callable) 
    if (finished.get()) { 
     log.info("check(Context, WorkflowAction) invoked - 
      Callable has finished"); 
     context.setExternalStatus(Status.OK.toString()); 
     context.setExecutionData(Status.OK.toString(), null); 
    } else { 
     log.info("check(Context, WorkflowAction) invoked"); 
     context.setExternalStatus(Status.RUNNING.toString()); 
    } 
} 

public void end(Context context, WorkflowAction action) 
     throws ActionExecutorException { 
    log.info("end(Context, WorkflowAction) invoked"); 
    context.setEndData(Status.OK, Status.OK.toString()); 
} 
+0

您能說明如何實現check()和initActionType()方法,以及如何在Callable中實現call()方法? – 2014-10-03 19:25:22

+0

@SSaikia_JtheRocker:我添加了實現 – gappc 2014-10-03 21:37:00

回答

0

一兩件事 - 我可以看到你關閉你已經提交作業後立即執行者 - executor.shutdown();。這可能會導致問題。你能否試着將這個陳述改爲end()方法?

+0

感謝您的想法。我試過了,但沒有區別。JavaDoc非常清楚如何使用shutdown:'啓動一個有序的關閉,其中執行先前提交的任務,但不會接受任何新任務。如果已關閉,調用沒有其他影響。 此方法不會等待先前提交的任務完成執行。# 也許您有其他想法? – gappc 2014-10-09 14:29:19

+0

@gappc:我懷疑AtomicBoolean變量是不是更新了什麼。請從start()方法中刪除檢查方法語句,然後檢查您在check()中執行的日誌消息是否可見。另外,記錄finished.get()的值。如果您可以使用JUnit測試用例進行測試,情況會好很多。 – 2014-10-10 10:15:18

+0

AtomicBoolean設置正確,我可以從日誌文件中看到,如果我手動調用check()(不同的日誌輸出)。如果我刪除手動檢查()調用,check()和end()根本不會被調用。對於JUnit測試的情況:你是對的:)但是在目前的階段,所需的解決方案根本無法工作,額外的努力並不值得 - 至少在我看來。我從oozie郵件列表中得到了一些答案,我現在正在嘗試,我會告訴你關於進度的信息 – gappc 2014-10-11 13:39:17

0

最後,我沒有找到問題的「真正」解決方案。爲我工作的解決方案是實現一個動作,它使用Java Executor框架並行調用Biohadoop實例。在調用之後,我等待(仍然在動作中)線程完成