1
我想了解DAG的行爲作出一些澄清,以及究竟如何已經處理以下工作:阿帕奇星火DAG行爲cogrouped操作
val rdd = sc.parallelize(List(1 to 10).flatMap(x=>x).zipWithIndex,3)
.partitionBy(new HashPartitioner(4))
val rdd1 = sc.parallelize(List(1 to 10).flatMap(x=>x).zipWithIndex,2)
.partitionBy(new HashPartitioner(3))
val rdd2 = rdd.join(rdd1)
rdd2.collect()
這是相關rdd2.toDebugString
:
(4) MapPartitionsRDD[6] at join at IntegrationStatusJob.scala:92 []
| MapPartitionsRDD[5] at join at IntegrationStatusJob.scala:92 []
| CoGroupedRDD[4] at join at IntegrationStatusJob.scala:92 []
| ShuffledRDD[1] at partitionBy at IntegrationStatusJob.scala:90 []
+-(3) ParallelCollectionRDD[0] at parallelize at IntegrationStatusJob.scala:90 []
+-(3) ShuffledRDD[3] at partitionBy at IntegrationStatusJob.scala:91 []
+-(2) ParallelCollectionRDD[2] at parallelize at IntegrationStatusJob.scala:91 []
看着toDebugString
和在th e spark UI,如果我理解的很好,爲了執行聯接,DAG會查看應該使用哪個分區程序,並且因爲這兩個rdds都是HashPartitioned
,所以選擇具有更多分區的分區程序,因此rdd
分區程序。
現在從火花UI,似乎rdd
partitionBy
和join
正在同臺這種條件下進行的,因此,執行連接所需的洗牌,將會從一個側面剛剛做?從一方面來說,我的意思是隻有rdd1會被洗牌,而不是兩個。
我的假設是否正確?