2016-03-04 143 views
5

假設我創造這樣的RDD(我用Pyspark):Spark如何決定如何分區RDD?

list_rdd = sc.parallelize(xrange(0, 20, 2), 6) 

然後我打印與glom()方法分區元素,並獲得

[[0], [2, 4], [6, 8], [10], [12, 14], [16, 18]] 

是如何星火決定如何分割我的列表?元素的具體選擇從哪裏來?它可以以不同的方式耦合它們,留下除0和10之外的其他元素,以創建6個請求的分區。第二次運行時,分區是相同的。

使用較大的範圍內,29元件,我得到分區中接着是三個元件2個元件的圖案:

list_rdd = sc.parallelize(xrange(0, 30, 2), 6) 
[[0, 2], [4, 6, 8], [10, 12], [14, 16, 18], [20, 22], [24, 26, 28]] 

使用較小的範圍9種元素的我得到

list_rdd = sc.parallelize(xrange(0, 10, 2), 6) 
[[], [0], [2], [4], [6], [8]] 

所以我推斷Spark是通過將列表分割成一個配置來生成分區,其中最小的可能是後面跟着更大的集合,然後重複。

問題是,如果這個選擇背後有一個原因,這是非常優雅的,但它也提供性能優勢?

回答

2

除非您指定了特定的分區程序,否則這是「隨機的」,因爲它取決於該RDD的特定實現。在這種情況下,您可以前往ParallelCollectionsRDD進一步深入研究。

getPartitions被定義爲:

val slices = ParallelCollectionRDD.slice(data, numSlices).toArray 
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray 

其中slice被註釋掉的(重新格式化,以更好地適應):

/** 
* Slice a collection into numSlices sub-collections. 
* One extra thing we do here is to treat Range collections specially, 
* encoding the slices as other Ranges to minimize memory cost. 
* This makes it efficient to run Spark over RDDs representing large sets of numbers. 
* And if the collection is an inclusive Range, 
* we use inclusive range for the last slice. 
*/ 

注意,有一些注意事項與問候的記憶。所以,這又將是具體實施。

相關問題