2017-08-21 21 views
1

我有一個36個元素的RDD。我有一個3個節點的集羣,每個節點有4個核心。我已經將RDD重新分區爲36個部分,以便每個分區可能有一個要處理的元素,但是整個36個元素都是分區的,這樣只有4個部分各有9個元素,其餘部分都是空的,因此沒有任何可處理的內容和服務器資源沒有得到充分利用。如何確保RDD的每個分區都有一些數據

如何重新分區數據以確保每個部分都有一些要處理的數據?我怎樣才能確保每個部件都有3個要素進行處理?

+0

你使用'coalesce'還是'repartition'?我想這也可能是因爲你的元素很少。 – philantrovert

+0

我正在使用重新分區。是的,我的元素太少,在這種情況下只有36個。但是每個元素都有很多處理要做。我希望每個分區都有一些數據,而不是不均勻的重新分區 –

+0

@philantrovert是否有解決此問題的方法,因爲我擁有數百萬條記錄,但有些分區根本沒有接收數據,有些分區的數據可能多達5個 –

回答

5

通過定義repartition(numPartitions)改組在RDD數據隨機產生更多或更少的分區並在它們之間進行平衡,它總是在網絡上慢騰騰的所有數據。

Apache Spark給出的保證是均勻分佈的,但這不會爲每個分區產生完全相同數量的元素。 (也即數據集的大小是非常小的!)

您可以考慮使用HashPartitioner

scala> val rdd = sc.parallelize(for { x <- 1 to 36 } yield (x, None), 8) 
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[31] at parallelize at <console>:27 

scala> import org.apache.spark.rdd.RDD 
import org.apache.spark.rdd.RDD 

scala> import org.apache.spark.HashPartitioner 
import org.apache.spark.HashPartitioner 

scala> def countByPartition(rdd: RDD[(Int, None.type)]) = rdd.mapPartitions(iter => Iterator(iter.length)) 
countByPartition: (rdd: org.apache.spark.rdd.RDD[(Int, None.type)])org.apache.spark.rdd.RDD[Int] 

scala> countByPartition(rdd).collect 
res25: Array[Int] = Array(4, 5, 4, 5, 4, 5, 4, 5) 

scala> countByPartition(rdd.partitionBy(new HashPartitioner(12))).collect 
res26: Array[Int] = Array(3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3) 

我已經借了從zero323的答案的例子和助手約How does HashPartitioner work?

我希望這有助於!

編輯:

如果你會做以下幾點:

scala> val rdd = sc.parallelize(for { x <- 1 to 36 } yield (x, None), 12) 
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[36] at parallelize at <console>:29 

scala> countByPartition(rdd).collect 
res28: Array[Int] = Array(4, 5, 4, 5, 4, 5, 4, 5) 

結果不會是一定相同。

+0

感謝您的答覆。如果我在第一行本身完成了val rdd = sc.parallelize(對於{x < - 1至36} yield(x,None),12),我是否會得到與countByPartition相同的結果(rdd.partitionBy(new HashPartitioner (12)))。collect –

+0

不可以。你必然會得到相同的結果。 – eliasah

相關問題