據我所知,有三個工作,因爲有三個動作
我甚至會說,有可能是更多的星火工作,但最小值是3
。這一切都取決於變革的實施和所採取的行動。
我不理解的是爲什麼在作業1個階段4,5和6是相同的階段0,1和作業0的2和同樣的情況對於工作2.
作業1是在RDD上運行的一些操作的結果,finalRdd
。該RDD是使用(向後退出)創建的:join
,textFile
,map
和distinct
。
val people = sc.textFile("people.csv").map { line =>
val tokens = line.split(",")
val key = tokens(2)
(key, (tokens(0), tokens(1))) }.distinct
val cities = sc.textFile("cities.csv").map { line =>
val tokens = line.split(",")
(tokens(0), tokens(1))
}
val finalRdd = people.join(cities)
運行上述,你會看到相同的DAG。現在

,當你執行leftOuterJoin
或rightOuterJoin
行動,你會得到另外兩個的DAG。您正在使用先前使用的RDD來運行新的Spark作業,因此您會看到相同的階段。
爲什麼階段4和9跳過
通常情況下,星火會跳過某些階段的執行。灰色階段是已經計算出來的階段,所以Spark會重用它們,從而使性能更好。

我怎麼知道會比作業只看到(執行任何東西之前)Java代碼更什麼階段?
這就是RDD譜系(圖)提供的。
scala> people.leftOuterJoin(cities).toDebugString
res15: String =
(3) MapPartitionsRDD[99] at leftOuterJoin at <console>:28 []
| MapPartitionsRDD[98] at leftOuterJoin at <console>:28 []
| CoGroupedRDD[97] at leftOuterJoin at <console>:28 []
+-(2) MapPartitionsRDD[81] at distinct at <console>:27 []
| | ShuffledRDD[80] at distinct at <console>:27 []
| +-(2) MapPartitionsRDD[79] at distinct at <console>:27 []
| | MapPartitionsRDD[78] at map at <console>:24 []
| | people.csv MapPartitionsRDD[77] at textFile at <console>:24 []
| | people.csv HadoopRDD[76] at textFile at <console>:24 []
+-(3) MapPartitionsRDD[84] at map at <console>:29 []
| cities.csv MapPartitionsRDD[83] at textFile at <console>:29 []
| cities.csv HadoopRDD[82] at textFile at <console>:29 []
正如你可以看到自己,你將最終獲得4個階段,因爲有3間洗牌的依賴關係(與分區數的邊緣)。
圓括號中的數字是DAGScheduler
最終將用於創建具有確切數量任務的任務集的分區數。每個階段一個TaskSet
。