2016-11-17 233 views
1

給定一個火花數據幀,其看起來像這樣:保火花數據幀列分區

================================== 
| Name | Col1 | Col2 | .. | ColN | 
---------------------------------- 
| A | 1 | 11 | .. | 21 | 
| A | 31 | 41 | .. | 51 | 
| B | 2 | 12 | .. | 22 | 
| B | 32 | 42 | .. | 52 | 
================================== 

我想運行,其執行對對應於表中的一個分區的聚集/計算邏輯一個特定的值爲Name。所述邏輯要求該分區的完整內容 - 並且該分區 - 在執行該邏輯的節點上的存儲器中被物化;它看起來像下面的processSegment功能:

def processDataMatrix(dataMatrix): 
    # do some number crunching on a 2-D matrix 

def processSegment(dataIter): 
    # "running" value of the Name column in the iterator 
    dataName = None 
    # as the iterator is processed, put the data in a matrix 
    dataMatrix = [] 

    for dataTuple in dataIter: 
     # separate the name column from the other columns 
     (name, *values) = dataTuple 
     # SANITY CHECK: ensure that all rows have same name 
     if (dataName is None): 
      dataName = name 
     else: 
      assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName) 

     # put the row in the matrix 
     dataMatrix.append(values) 

    # if any rows were processed, number-crunch the matrix 
    if (dataName is not None): 
     return processDataMatrix(dataMatrix) 
    else: 
     return [] 

我曾嘗試通過基於Name列重新分區,然後在每個分區上運行processSegment,使這項工作通過mapPartitions底層RDD:

result = \ 
    stacksDF \ 
     .repartition('Name') \ 
     .rdd \ 
     .mapPartitions(processSegment) \ 
     .collect() 

然而,這一進程經常未能在processSegmentSANITY CHECK斷言:

AssertionError: row name Q7 does not match expected A9 

當我試圖在底層RDD上運行mapPartitions時,爲什麼在DataFrame上表面上執行的分區不會被保留?如果上述方法無效,是否有某種方法(使用DataFrame API或RDD API),這將使我能夠對DataFrame分區的內存再現執行聚合邏輯?

(由於我使用PySpark,和特定的數字運算,邏輯我想要執行的是Python的,用戶定義的聚合函數(UDAFs)would not appear to be an option。)

回答

1

我相信,你誤會了如何劃分工作。一般來說,partioner是一個滿意的函數,而不是雙射函數。儘管特定值的所有記錄都將移至單個分區,但分區可能包含具有多個不同值的記錄。

DataFrame API不給你在分區的任何控制,但也可以自定義partitionFunc定義使用RDD API時。這意味着你可以使用其中一個是雙射,例如:

mapping = (df 
    .select("Name") 
    .distinct() 
    .rdd.flatMap(lambda x: x) 
    .zipWithIndex() 
    .collectAsMap()) 

def partitioner(x): 
    return mapping[x] 

和按如下方式使用它:

df.rdd.map(lambda row: (row.Name, row)).partitionBy(len(mapping), partitioner) 

雖然有可能你要記住,分區是不是免費的,如果數量獨特的價值很大,它可能會成爲一個嚴重的性能問題。