嗯,這取決於。默認groupByKey
正在使用HashPartitioner
。讓我們假設你只有兩個分區。這意味着,與主要對「一」將進入分區號1
scala> "a".hashCode % 2
res17: Int = 1
,並與密鑰「b」對,如果你創建RDD這樣的分區2
scala> "b".hashCode % 2
res18: Int = 0
:
val rdd = sc.parallelize(("a", 1) :: ("a", 2) :: ("b", 1) :: Nil, 2).cache
有多種情況如何分配數據。首先我們需要一個小幫手:
def addPartId[T](iter: Iterator[T]) = {
Iterator((TaskContext.get.partitionId, iter.toList))
}
方案1
rdd.mapPartitions(addPartId).collect
Array((0,List((b,1))), (1,List((a,1), (a,2))))
必要的,因爲所有對已經在正確的分區
方案2
無數據移動
Array((0,List((a,1), (a,2))), (1,List((b,1))))
雖然匹配對已經在相同的分區中的所有對必須被移動,因爲分區ID不匹配鍵
方案3
一些混合分佈,其中只有數據的一部分已經被移動:
Array((0,List((a,1))), (1,List((a,2), (b,1))))
如果數據是使用HashPartioner
groupByKey
之前沒有必要對任何洗牌分區。
val rddPart = rdd.partitionBy(new HashPartitioner(2)).cache
rddPart.mapPartitions(addPartId).collect
Array((0,List((b,1))), (1,List((a,1), (a,2))))
rddPart.groupByKey
你有一個參考,顯示「默認groupByKey使用HashPartitioner」?我想象是這樣,但無法谷歌這(不知道類名稱HashPartitioner)。現在,在使用這個概念進行搜索之後,我認爲通過爲RDD指定一個自定義分區程序實際上可以避免洗牌(https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ ch04.html),不? – EOL
你可以例如檢查['pyspark。rdd.groupBy'](https://github.com/apache/spark/blob/ffa05c84fe75663fc33f3d954d1cb1e084ab3280/python/pyspark/rdd.py#L1837)或其在Scala API中的等價物。經驗上,你可以比較'rdd.partitioner'和'rdd.groupByKey.partitioner'。關於自定義分區程序,我不明白你爲什麼認爲它可以幫助你。從理論上講,可以先計算關鍵分佈統計數據,然後嘗試優化關鍵分區映射,但不能保證會得到更好的結果,這不是一個小問題。 – zero323
很有參考價值!就我的理解而言,自定義分區程序有助於我的情況:讀取多個文件,每行生成一個密鑰;這些文件的特殊性質是給定的鍵幾乎總是在單個文件中。但是,爲了恢復那些跨越多個文件的罕見密鑰,需要進行洗牌。我正在考慮計算一個分區器,它將每個鍵映射到最常見的分區(在我的具體情況下,應該可能只有一個這樣的分區)。不過,你是對的:這可能不會使整個操作更快。 – EOL