2016-06-23 94 views
2

我希望能夠爲每個任務生成度量標準的表,例如在訪問某個特定階段時收集器Spark UI上的標準。查找計劃程序Spark的延遲

其中一列是Scheduler delay,在Spark提供的任何REST API中找不到。 (當我瀏覽/ api/v1/applications/[app-id]/stages/[stage-id]/[attempt]/taskList)時,會出現所有其他列(當我瀏覽/ api/v1/applications/[app-id]/stages/[stage-id]/[嘗試]/taskList時)。

調度程序延遲如何計算/是否有一種方法可以在不刮取收集器Spark UI網頁的情況下抽出數據?

回答

2

調度程序延遲沒有在歷史api中提供,是的。

private[ui] def getSchedulerDelay(info: TaskInfo, metrics: TaskMetricsUIData, currentTime: Long): Long = { 
    if (info.finished) { 
     val totalExecutionTime = info.finishTime - info.launchTime 
     val executorOverhead = (metrics.executorDeserializeTime + metrics.resultSerializationTime) 
     math.max(0,totalExecutionTime - metrics.executorRunTime - executorOverhead - getGettingResultTime(info, currentTime)) 
    } else { 
     // The task is still running and the metrics like executorRunTime are not available. 
     0L 
    } 
} 

看到https://github.com/apache/spark/blob/branch-2.0/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala行號770

0

至少火花1.6,如果你正在尋找調度延遲火花流批,你可以看看spark streaming UI source code:對於UI,如下它的計算方法。

它使用一類BatchUIData,其中scheduling delay定義:

/** 
* Time taken for the first job of this batch to start processing from the time this batch 
* was submitted to the streaming scheduler. Essentially, it is 
* `processingStartTime` - `submissionTime`. 
*/ 
def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime)