2017-03-23 71 views
1

我想了解DAG的行爲作出一些澄清,以及究竟如何已經處理以下工作:阿帕奇星火DAG行爲cogrouped操作

val rdd = sc.parallelize(List(1 to 10).flatMap(x=>x).zipWithIndex,3) 
.partitionBy(new HashPartitioner(4)) 

val rdd1 = sc.parallelize(List(1 to 10).flatMap(x=>x).zipWithIndex,2) 
.partitionBy(new HashPartitioner(3)) 

val rdd2 = rdd.join(rdd1) 
rdd2.collect() 

這是相關rdd2.toDebugString

(4) MapPartitionsRDD[6] at join at IntegrationStatusJob.scala:92 [] 
| MapPartitionsRDD[5] at join at IntegrationStatusJob.scala:92 [] 
| CoGroupedRDD[4] at join at IntegrationStatusJob.scala:92 [] 
| ShuffledRDD[1] at partitionBy at IntegrationStatusJob.scala:90 [] 
+-(3) ParallelCollectionRDD[0] at parallelize at IntegrationStatusJob.scala:90 [] 
+-(3) ShuffledRDD[3] at partitionBy at IntegrationStatusJob.scala:91 [] 
    +-(2) ParallelCollectionRDD[2] at parallelize at IntegrationStatusJob.scala:91 [] 

這是火花UI圖像: enter image description here

看着toDebugString和在th e spark UI,如果我理解的很好,爲了執行聯接,DAG會查看應該使用哪個分區程序,並且因爲這兩個rdds都是HashPartitioned,所以選擇具有更多分區的分區程序,因此rdd分區程序。

現在從火花UI,似乎rddpartitionByjoin正在同臺這種條件下進行的,因此,執行連接所需的洗牌,將會從一個側面剛剛做?從一方面來說,我的意思是隻有rdd1會被洗牌,而不是兩個。

我的假設是否正確?

回答

1

你說得對。如果兩個RDD使用不同的分區器進行分區,則Spark將選擇一個作爲參考,並僅對第二個進行修改/洗牌。

如果兩者具有相同的分區器,則不需要洗牌。