我有一個36個元素的RDD。我有一個3個節點的集羣,每個節點有4個核心。我已經將RDD重新分區爲36個部分,以便每個分區可能有一個要處理的元素,但是整個36個元素都是分區的,這樣只有4個部分各有9個元素,其餘部分都是空的,因此沒有任何可處理的內容和服務器資源沒有得到充分利用。如何確保RDD的每個分區都有一些數據
如何重新分區數據以確保每個部分都有一些要處理的數據?我怎樣才能確保每個部件都有3個要素進行處理?
我有一個36個元素的RDD。我有一個3個節點的集羣,每個節點有4個核心。我已經將RDD重新分區爲36個部分,以便每個分區可能有一個要處理的元素,但是整個36個元素都是分區的,這樣只有4個部分各有9個元素,其餘部分都是空的,因此沒有任何可處理的內容和服務器資源沒有得到充分利用。如何確保RDD的每個分區都有一些數據
如何重新分區數據以確保每個部分都有一些要處理的數據?我怎樣才能確保每個部件都有3個要素進行處理?
通過定義,repartition(numPartitions)
改組在RDD數據隨機產生更多或更少的分區並在它們之間進行平衡,它總是在網絡上慢騰騰的所有數據。
Apache Spark給出的保證是均勻分佈的,但這不會爲每個分區產生完全相同數量的元素。 (也即數據集的大小是非常小的!)
您可以考慮使用HashPartitioner
:
scala> val rdd = sc.parallelize(for { x <- 1 to 36 } yield (x, None), 8)
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[31] at parallelize at <console>:27
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> def countByPartition(rdd: RDD[(Int, None.type)]) = rdd.mapPartitions(iter => Iterator(iter.length))
countByPartition: (rdd: org.apache.spark.rdd.RDD[(Int, None.type)])org.apache.spark.rdd.RDD[Int]
scala> countByPartition(rdd).collect
res25: Array[Int] = Array(4, 5, 4, 5, 4, 5, 4, 5)
scala> countByPartition(rdd.partitionBy(new HashPartitioner(12))).collect
res26: Array[Int] = Array(3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3)
我已經借了從zero323的答案的例子和助手約How does HashPartitioner work?
我希望這有助於!
編輯:
如果你會做以下幾點:
scala> val rdd = sc.parallelize(for { x <- 1 to 36 } yield (x, None), 12)
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[36] at parallelize at <console>:29
scala> countByPartition(rdd).collect
res28: Array[Int] = Array(4, 5, 4, 5, 4, 5, 4, 5)
結果不會是一定相同。
感謝您的答覆。如果我在第一行本身完成了val rdd = sc.parallelize(對於{x < - 1至36} yield(x,None),12),我是否會得到與countByPartition相同的結果(rdd.partitionBy(new HashPartitioner (12)))。collect –
不可以。你必然會得到相同的結果。 – eliasah
你使用'coalesce'還是'repartition'?我想這也可能是因爲你的元素很少。 – philantrovert
我正在使用重新分區。是的,我的元素太少,在這種情況下只有36個。但是每個元素都有很多處理要做。我希望每個分區都有一些數據,而不是不均勻的重新分區 –
@philantrovert是否有解決此問題的方法,因爲我擁有數百萬條記錄,但有些分區根本沒有接收數據,有些分區的數據可能多達5個 –