給定一個火花數據幀,其看起來像這樣:保火花數據幀列分區
==================================
| Name | Col1 | Col2 | .. | ColN |
----------------------------------
| A | 1 | 11 | .. | 21 |
| A | 31 | 41 | .. | 51 |
| B | 2 | 12 | .. | 22 |
| B | 32 | 42 | .. | 52 |
==================================
我想運行,其執行對對應於表中的一個分區的聚集/計算邏輯一個特定的值爲Name
。所述邏輯要求該分區的完整內容 - 並且該分區 - 在執行該邏輯的節點上的存儲器中被物化;它看起來像下面的processSegment
功能:
def processDataMatrix(dataMatrix):
# do some number crunching on a 2-D matrix
def processSegment(dataIter):
# "running" value of the Name column in the iterator
dataName = None
# as the iterator is processed, put the data in a matrix
dataMatrix = []
for dataTuple in dataIter:
# separate the name column from the other columns
(name, *values) = dataTuple
# SANITY CHECK: ensure that all rows have same name
if (dataName is None):
dataName = name
else:
assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName)
# put the row in the matrix
dataMatrix.append(values)
# if any rows were processed, number-crunch the matrix
if (dataName is not None):
return processDataMatrix(dataMatrix)
else:
return []
我曾嘗試通過基於Name
列重新分區,然後在每個分區上運行processSegment
,使這項工作通過mapPartitions
底層RDD:
result = \
stacksDF \
.repartition('Name') \
.rdd \
.mapPartitions(processSegment) \
.collect()
然而,這一進程經常未能在processSegment
的SANITY CHECK
斷言:
AssertionError: row name Q7 does not match expected A9
當我試圖在底層RDD上運行mapPartitions
時,爲什麼在DataFrame上表面上執行的分區不會被保留?如果上述方法無效,是否有某種方法(使用DataFrame API或RDD API),這將使我能夠對DataFrame分區的內存再現執行聚合邏輯?
(由於我使用PySpark,和特定的數字運算,邏輯我想要執行的是Python的,用戶定義的聚合函數(UDAFs)would not appear to be an option。)