2017-02-14 101 views
9

考慮一下我在Spark中的工作如下;如何知道Apache Spark中當前正在運行哪個階段的工作?

CSV文件 ==>過濾用一個柱 ==>以樣品 ==>另存爲JSON

現在我的要求就是我怎麼知道哪些步驟(擷取文件Filtering or 取樣)當前正在以編程方式執行(最好使用Java API)?有沒有辦法呢?

我可以跟蹤作業,舞臺和任務使用SparkListener類。它可以像跟蹤階段ID一樣完成。但是如何知道哪個階段的Id是工作鏈中的哪一步。

我想發送通知給用戶時,考慮按列過濾完成。爲此,我創建了一個擴展SparkListener類的類。但是我無法從中找到當前正在執行的轉換名稱的名稱。是否有可能跟蹤?

public class ProgressListener extends SparkListener{ 

    @Override 
    public void onJobStart(SparkListenerJobStart jobStart) 
    { 

    } 

    @Override 
    public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) 
    { 
     //System.out.println("Stage Name : "+stageSubmitted.stageInfo().getStatusString()); giving action name only 
    } 

    @Override 
    public void onTaskStart(SparkListenerTaskStart taskStart) 
    { 
     //no such method like taskStart.name() 
    } 
} 
+1

關閉註釋看起來不太合適:這當然是一個編程相關的問題,它在寬度/範圍內似乎也是合理的。 – javadba

回答

3

您無法準確知道何時(例如)過濾器操作開始或結束。

這是因爲您有轉換(filter,map,...)和操作(count,foreach,...)。 Spark將盡可能多的操作放到一個階段。然後在您輸入的不同分區上並行執行舞臺。問題來了。

假設你有幾個工人和下面的程序

LOAD ==>地圖==>過濾==> GROUP BY +聚合

這一方案將可能有兩個階段:第一階段將加載文件並應用mapfilter。 然後輸出將被混洗以創建組。在第二階段,將執行聚合。

現在,問題是,你有幾個工人,每個人都會並行處理一部分輸入數據。也就是說,羣集中的每個執行程序都會收到程序的副本(當前階段)並在分配的分區上執行此操作。

您會看到,您將有多個並行執行的mapfilter運算符的實例,但不一定同時執行。在極端情況下,工人1將在工人20開始工作之前完成階段1(並且因此在工人20之前完成其filter操作)。

對於RDD Spark在舞臺中使用iterator model。但是,對於最新的Spark版本中的數據集,它們會在分區上創建一個循環並執行轉換。這意味着在這種情況下,Spark本身並不知道轉換操作符何時完成單個任務!

長話短說:

  1. 你是不是能夠在知道當一個階段內的操作完成
  2. ,即使你可以有多個實例,將在不同的時間完成。

所以,現在我已經有同樣的問題:

在我們Piglet project(請允許一些adverstisement ;-)),我們生成的Pig Latin腳本星火代碼,並希望配置文件的腳本。我最終在所有用戶操作員之間插入mapPartition運營商,該用戶運營商將發送分區ID和當前時間給將評估消息的服務器。但是,這個解決方案也有其侷限性......我還沒有完全滿意。

但是,除非你能夠修改程序,恐怕你不能達到你想要的。

0

你考慮這個選項:http://spark.apache.org/docs/latest/monitoring.html
看來你可以使用下面的REST API,以獲得一定的工作狀態/應用/ [APP-ID] /職位/ [作業ID]

您可以設置JobGroupId和JobGroupDescription,以便跟蹤正在處理的作業組。即setJobGroup

假設你會打電話給JobGroupId「測試」

sc.setJobGroup("1", "Test job") 

當你調用http://localhost:4040/api/v1/applications/[app-id]/jobs/[job-id]

你會得到一個JSON與該職位的描述性名稱:

{ 
    "jobId" : 3, 
    "name" : "count at <console>:25", 
    "description" : "Test Job", 
    "submissionTime" : "2017-02-22T05:52:03.145GMT", 
    "completionTime" : "2017-02-22T05:52:13.429GMT", 
    "stageIds" : [ 3 ], 
    "jobGroup" : "1", 
    "status" : "SUCCEEDED", 
    "numTasks" : 4, 
    "numActiveTasks" : 0, 
    "numCompletedTasks" : 4, 
    "numSkippedTasks" : 0, 
    "numFailedTasks" : 0, 
    "numActiveStages" : 0, 
    "numCompletedStages" : 1, 
    "numSkippedStages" : 0, 
    "numFailedStages" : 0 
} 
+0

我沒有嘗試,因爲在Spark Java API中有一個函數可以複製REST API的每個端點。我在Java API中嘗試了所有這些功能。您能否告訴我您認爲可以解決問題的REST API的哪個終點?然後,我可以使用Java API在此處發佈該端點的輸出。 –

+0

JSON與我得到的完全一樣。但我怎麼理解「jobId」:3是爲「filterByAColumn」或「takingSample」這樣的步驟? –

+0

爲什麼你不能使用jobGroup或JobDescription? – liorsolomon

相關問題