2016-02-19 21 views
1

我試圖加入兩個數據集。其中一個類型(Id,salesRecord)另一個(Id,Name)。 第一個數據集由HashPartitioner分區,第二個數據集由Custom Partitioner分區。當我通過id連接這些RDD並嘗試查看哪些分區信息被保留時,我隨機看到有些時候joinRDD會顯示自定義分區,有時候會顯示HashPartitioner。我還收到了不同的分割結果,同時還更改了分區數量。加入的RDD上的隨機分區程序行爲

根據Learning Spark的書,rdd1.join(rdd2)保留了rdd1的分區信息。

這是代碼。

val hashPartitionedRDD = cusotmerIDSalesRecord.partitionBy(new HashPartitioner(10)) 
println("hashPartitionedRDD's partitioner " + hashPartitionedRDD.partitioner) // Seeing Instance of HashParitioner 

val customPartitionedRDD = customerIdNamePair1.partitionBy(new CustomerPartitioner) 
println("customPartitionedRDD partitioner " + customPartitionedRDD.partitioner) // Seeing instance of CustomPartitioner 

val expectedHash = hashPartitionedRDD.join(customPartitionedRDD) 
val expectedCustom = customPartitionedRDD.join(hashPartitionedRDD) 

println("Expected Hash " + expectedHash.partitioner) // Seeing instance of Custom Partitioner 
println("Expected Custom " + expectedCustom.partitioner) //Seeing instance of Custom Partitioner 

// Just to add more to it when number of partitions of both the data sets I made equal I am seeing the reverse results. i.e. 
// expectedHash shows CustomPartitioner and 
// expectedCustom shows Hashpartitioner Instance. 

回答

4

join方法內部調用,Partitioner.defaultPartitioner()方法。

基於對defaultPartitioner定義:

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { 
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse 
    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { 
     return r.partitioner.get 
    } 
    if (rdd.context.conf.contains("spark.default.parallelism")) { 
     new HashPartitioner(rdd.context.defaultParallelism) 
    } else { 
     new HashPartitioner(bySize.head.partitions.size) 
    } 
    } 
} 

如果你在排隊仔細一看:基於降序排列分區的數量

val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse 

它開始for-loop(或搜索)。因此,如果RDDs都有自己的分區,分區數量較多的分區將被選中

編輯:您提到關於看到reverse行爲的問題很簡單。在這裏,如果兩者都具有相同數量的分區,則others將位於Seq的頂部。因此,將選擇參數RDD的分區器。

(Seq(rdd) ++ others).sortBy(_.partitions.size).reverse 

此行爲是可以解釋的,但可能不直觀。

+0

Thanks @Mohitt。有你,但是當兩個數據集有相同數量的分區程序時,我看到相反的結果 –

+0

@java_enthu:答案已更新。 – Mohitt