根據Spark文檔,只有RDD操作可以觸發Spark作業,並且在對其執行操作時對轉換進行懶惰評估。爲什麼sortBy轉換觸發Spark作業?
我看到sortBy
立即應用轉換函數,它在SparkUI中顯示爲作業觸發器。爲什麼?
根據Spark文檔,只有RDD操作可以觸發Spark作業,並且在對其執行操作時對轉換進行懶惰評估。爲什麼sortBy轉換觸發Spark作業?
我看到sortBy
立即應用轉換函數,它在SparkUI中顯示爲作業觸發器。爲什麼?
sortBy
使用sortByKey
實現,這取決於RangePartitioner
(JVM)或分區函數(Python)。當您致電sortBy
/sortByKey
分區程序(分區函數)被急切初始化並對輸入RDD進行採樣以計算分區邊界。你看到的工作對應於這個過程。
僅當您對新創建的RDD
或其後代執行操作時,纔會執行實際排序。
根據Spark文檔,只有該操作會在Spark中觸發一個作業,當對該操作進行操作時,轉換將被延遲評估。
一般來說,你是對的,但就像剛纔經歷,很少有例外和sortBy
是其中(與zipWithIndex
)。
事實上,它在Spark的JIRA中報告並以Will not Fix分辨率結束。見SPARK-1021 sortByKey() launches a cluster job when it shouldn't。
您可以看到任務運行與DAGScheduler
啓用日誌記錄(以及後來在網絡用戶界面):
scala> sc.parallelize(0 to 8).sortBy(identity)
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
DEBUG DAGScheduler: submitStage(ResultStage 1)
DEBUG DAGScheduler: missing: List()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25)
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4)
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25