2017-06-07 295 views
0

我正在致力於應用程序從java應用程序執行spark批量應用程序。從Java應用程序啓動並監控Spark應用程序

有一個主線程啓動線程來啓動spark應用程序。它使用zookeeper在將啓動spark應用程序的機器中找到leaderMain方法是這樣的:

public static void main(String[] args) throws IOException { 

     final int id = Integer.valueOf(args[0]); 
     final String zkURL = args[1]; 

     final ExecutorService service = Executors.newSingleThreadExecutor(); 

     final Future<?> status = service.submit(new ProcessNode(id, zkURL)); 
     try { 
      status.get(); 
     } catch (InterruptedException | ExecutionException e) { 
      LOG.fatal(e.getMessage(), e); 
      service.shutdown(); 
     } 

一旦leader選擇,下面的代碼將在其上運行啓動spark應用。

protected Boolean executeCommand() { 
    try { 
     final Runtime rt = Runtime.getRuntime(); 
     final Process proc = rt.exec("sh start-sparkapp.sh"); 
     final int exitVal = proc.waitFor(); 
     BufferedReader buf = new BufferedReader(new InputStreamReader(proc.getInputStream())); 
     String line = ""; 
     while ((line=buf.readLine())!=null) { 
     System.out.println(line); 
     } 

     System.out.println(" commandToExecute exited with code: " + exitVal); 
     proc.destroy(); 

    } catch (final Exception e) { 
     System.out.println("Exception occurred while Launching process : " + e.getMessage()); 
     return Boolean.FALSE; 
    } 
     return Boolean.TRUE; 
} 

但是這開始長時間運行spark工作。所以我相信,只有當spark工作完成時,代碼的下一部分纔會被執行。我的要求是,只要spark應用程序啓動,控制權轉到代碼的下一部分,我在監視相同的spark應用程序的狀態。即我啓動了spark應用程序,並從相同的java應用程序監視spark應用程序的狀態。 假設我有一個方法montior用於監視應用

public String monitor(ApplicationId id) 

任何建議的狀態如何實現這一目標?

+0

標記「apache-spark」是否合理? – suj1th

回答

0

由於您將使用方法public String monitor(ApplicationId id)來監視Spark應用程序,因此我假定您不希望當前線程在使用proc.waitFor()的進程上等待。另外,您不希望將流程的正常輸出打印到控制檯。這兩個操作都會使您的線程在生成的進程中等待。而且,你的監視器方法應該把生成的進程的進程id,而不是spark applicationId作爲輸入。 因此,修改後的代碼可能如下所示:

protected Boolean executeCommand() { 
try { 
    final Runtime rt = Runtime.getRuntime(); 
    final Process proc = rt.exec("sh start-sparkapp.sh"); 

    /* 
    *Call to method monitor(ProcessId id) 
    */ 

    } catch (final Exception e) { 
     System.out.println("Exception occurred while Launching process : " + e.getMessage()); 
     return Boolean.FALSE; 
    } 
    return Boolean.TRUE; 
}