2015-08-21 18 views
2

在包含分區的火花數據集的簡單情況每個鍵是僅僅在單個分區目前,像在下面的兩個分區的情況下:Spark中的洗牌「總是」移動數據,即使是在微不足道的情況下?

  1. [(「一」,1) ,( 「A」,2)]
  2. [( 「b」,1)],

將混洗操作(像groupByKey)通常洗牌數據翻過分區,即使沒有必要對這個?

我在問這個問題,因爲洗牌很昂貴,所以這對於大型數據集來說很重要。我的用例恰恰是這樣的:一個大型數據集,每個鍵幾乎總是位於一個分區中。

回答

1

嗯,這取決於。默認groupByKey正在使用HashPartitioner。讓我們假設你只有兩個分區。這意味着,與主要對「一」將進入分區號1

scala> "a".hashCode % 2 
res17: Int = 1 

,並與密鑰「b」對,如果你創建RDD這樣的分區2

scala> "b".hashCode % 2 
res18: Int = 0 

val rdd = sc.parallelize(("a", 1) :: ("a", 2) :: ("b", 1) :: Nil, 2).cache 

有多種情況如何分配數據。首先我們需要一個小幫手:

def addPartId[T](iter: Iterator[T]) = { 
    Iterator((TaskContext.get.partitionId, iter.toList)) 
} 

方案1

rdd.mapPartitions(addPartId).collect 
Array((0,List((b,1))), (1,List((a,1), (a,2)))) 

必要的,因爲所有對已經在正確的分區

方案2

無數據移動
Array((0,List((a,1), (a,2))), (1,List((b,1)))) 

雖然匹配對已經在相同的分區中的所有對必須被移動,因爲分區ID不匹配鍵

方案3

一些混合分佈,其中只有數據的一部分已經被移動:

Array((0,List((a,1))), (1,List((a,2), (b,1)))) 

如果數據是使用HashPartionergroupByKey之前沒有必要對任何洗牌分區。

val rddPart = rdd.partitionBy(new HashPartitioner(2)).cache 
rddPart.mapPartitions(addPartId).collect 

Array((0,List((b,1))), (1,List((a,1), (a,2)))) 

rddPart.groupByKey 
+0

你有一個參考,顯示「默認groupByKey使用HashPartitioner」?我想象是這樣,但無法谷歌這(不知道類名稱HashPartitioner)。現在,在使用這個概念進行搜索之後,我認爲通過爲RDD指定一個自定義分區程序實際上可以避免洗牌(https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ ch04.html),不? – EOL

+1

你可以例如檢查['pyspark。rdd.groupBy'](https://github.com/apache/spark/blob/ffa05c84fe75663fc33f3d954d1cb1e084ab3280/python/pyspark/rdd.py#L1837)或其在Scala API中的等價物。經驗上,你可以比較'rdd.partitioner'和'rdd.groupByKey.partitioner'。關於自定義分區程序,我不明白你爲什麼認爲它可以幫助你。從理論上講,可以先計算關鍵分佈統計數據,然後嘗試優化關鍵分區映射,但不能保證會得到更好的結果,這不是一個小問題。 – zero323

+0

很有參考價值!就我的理解而言,自定義分區程序有助於我的情況:讀取多個文件,每行生成一個密鑰;這些文件的特殊性質是給定的鍵幾乎總是在單個文件中。但是,爲了恢復那些跨越多個文件的罕見密鑰,需要進行洗牌。我正在考慮計算一個分區器,它將每個鍵映射到最常見的分區(在我的具體情況下,應該可能只有一個這樣的分區)。不過,你是對的:這可能不會使整個操作更快。 – EOL

相關問題