2016-12-27 60 views
3

我正在寫一個pyspark腳本來讀取一個大的二維數組,因此我試圖首先生成一個索引數組並使用讀取方法映射以讀取相應的數組。例如,如果我有一個包含10行的數組,那麼我希望這10行均勻分區,因爲每個分區有2行。我想這種方式與sortBy():爲什麼sortBy()無法在Spark中均勻排序數據?

rdd = sc.range(0, 10, 1).sortBy(lambda x: x, numPartitions = 5) 
rdd.glom().collect() 

然而,結果顯示爲:

[[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]] 

指示sortBy()沒有工作,如我所料,所以第一個分區有3個數字,而最後一個分區只有1個數字。當我用另一個讀取方法映射每個分區時,分區的大小是不同的,有時會引起失落。

我試着RDDS代的另一種方式:

rdd = sc.parallelize(range(0, 10, 1), 5) 
rdd.glom().collect() 

,並返回我想要的結果。

[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]] 

有人可以幫助解釋爲什麼使用sortBy()的第一個方法不能返回均勻排序的結果嗎?

回答

8

因爲它沒有設計成。在一般情況下,不可能對數據進行分區(包括範圍分區)以實現相同大小的分區。請記住,通過分區器的合同,特定值的所有記錄必須駐留在單個分區上。即使在可能實現均勻分佈的情況下,確定準確的分區邊界也將非常昂貴。

因此,Spark目標樣本數據的目標是獲得範圍爲的大約統一大小,這種行爲對於典型的Spark應用程序來說已經足夠好了。

SparkContext.parallelize根本不使用分區器。相反,它會根據特定輸入的語義來計算分割,因此可以創建大小相等的分割。

如果您有關於數據分發的先前知識,則您始終可以設計自定義分區功能,從而實現所需的輸出。例如:

import bisect 
from functools import partial 

partition_func = partial(bisect.bisect, [2, 4, 6, 8]) 

(sc.range(0, 10) 
    .map(lambda x: (x, None)) 
    .repartitionAndSortWithinPartitions(5, partition_func) 
    .keys()) 

相對較短(最長1 < < 60左右)系列在CPython中,您可以使用散列分區的整數:

(sc.range(0, 10, 1) 
    .map(lambda x: (x, None)) 
    .partitionBy(10) 
    .keys() 
    .glom() 
    .collect()) 
[[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]] 

,但只是實現的細節(hash(x)其中isinstance(x, int)等於x)。

相關問題