1

我目前使用Dataproc的Java客戶端API通過Spring REST服務觸發Spark作業。火花工作的基礎是:在Java中等待Google Dataproc SparkJob的最佳方式是什麼?

  1. 初始化星火
  2. 過程數據
  3. 結果存入一個GS桶以.json文件

我之所以存儲的數據是這樣,當我的Spark Job完成並將結果存儲在JSON文件中,我可以從REST服務讀取存儲的結果。但是,Dataproc的Java Client API只是觸發作業,不會等待作業完成。因此,等待火花作業完成的最佳方式是什麼?我不想使用Object.wait(int time),因爲不同的spark工作將有不同的執行時間。

回答

3

通過Dataproc REST API,在作業上調用GET將返回有關作業狀態的信息。在一般情況下,你可以簡單地有一個輪詢循環:

public static final ImmutableSet<String> TERMINAL_JOB_STATES = 
    ImmutableSet.of("CANCELLED", "DONE", "ERROR"); 

// Initialize this as normal with credentials, setAppName, HttpTransport, etc. 
private Dataproc dataproc; 

public void waitJob(String projectId, String jobId) throws IOException, InterruptedException { 
    Job job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute(); 
    while (!TERMINAL_JOB_STATES.contains(job.getStatus().getState())) { 
    System.out.println("Job not done yet; current state: " + job.getStatus().getState()); 
    Thread.sleep(5000); 
    job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute(); 
    } 
    System.out.println("Job terminated in state: " + job.getStatus().getState()); 
} 

您可能還需要包裝內try/catch報表的情況下,導尿IOException錯誤的.execute()電話是某種短暫的網絡連接錯誤的(任何500 HTTP code錯誤應該只是重試)。您可能還需要最長的等待時間,以防某些事情阻止工作完成,或者您無意中重試了404 not found錯誤。

你也應該能夠檢測到404 not found錯誤IOException;如果您在投票完成之前意外進入並刪除了一項工作,或者如果一個錯誤導致您輸入waitJob呼叫,但呼叫失敗SubmitJob,則會發生這種情況。你應該可以嘗試嘗試去找一個不存在的工作,看看在這種情況下錯誤是什麼樣的,以避免無限循環。

相關問題