2016-12-30 13 views

回答

3

sortBy使用sortByKey實現,這取決於RangePartitioner(JVM)或分區函數(Python)。當您致電sortBy/sortByKey分區程序(分區函數)被急切初始化並對輸入RDD進行採樣以計算分區邊界。你看到的工作對應於這個過程。

僅當您對新創建的RDD或其後代執行操作時,纔會執行實際排序。

1

根據Spark文檔,只有該操作會在Spark中觸發一個作業,當對該操作進行操作時,轉換將被延遲評估。

一般來說,你是對的,但就像剛纔經歷,很少有例外和sortBy是其中(與zipWithIndex)。

事實上,它在Spark的JIRA中報告並以Will not Fix分辨率結束。見SPARK-1021 sortByKey() launches a cluster job when it shouldn't

您可以看到任務運行與DAGScheduler啓用日誌記錄(以及後來在網絡用戶界面):

scala> sc.parallelize(0 to 8).sortBy(identity) 
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions 
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25) 
INFO DAGScheduler: Parents of final stage: List() 
INFO DAGScheduler: Missing parents: List() 
DEBUG DAGScheduler: submitStage(ResultStage 1) 
DEBUG DAGScheduler: missing: List() 
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents 
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1) 
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25) 
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4) 
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s 
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0 
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s 
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25