如何在pyspark中將groupbyKey轉換爲reduceByKey。我附上了一個片段。這將爲每個區域部門周組合應用一個更正。我使用了groupbyKey,但它非常緩慢和Shuffle錯誤(我有10-20GB的數據,每個組將有2-3GB)。請幫我在此重寫使用reduceByKey將groupBYKey轉換爲ReduceByKey Pyspark
數據集
region dept week val1 valu2
US CS 1 1 2
US CS 2 1.5 2
US CS 3 1 2
US ELE 1 1.1 2
US ELE 2 2.1 2
US ELE 3 1 2
UE CS 1 2 2
輸出
region dept corr
US CS 0.5
US ELE 0.6
UE CS .3333
代碼
def testFunction (key, value):
for val in value:
keysValue = val.asDict().keys()
inputpdDF.append(dict([(keyRDD, val[keyRDD]) for keyRDD in keysValue])
pdDF = pd.DataFrame(inputpdDF, columns = keysValue)
corr = pearsonr(pdDF['val1'].astype(float), pdDF['val1'].astype(float))[0]
corrDict = {"region" : key.region, "dept" : key.dept, "corr": corr}
finalRDD.append(Row(**corrDict))
return finalRDD
resRDD = df.select(["region", "dept", "week", "val1", "val2"])\
.map(lambda r: (Row(region= r.region, dept= r.dept), r))\
.groupByKey()\
.flatMap(lambda KeyValue: testFunction(KeyValue[0], list(KeyValue[1])))
reduceByKey在幾種方式上與groupByKey不同,但主要的是aggregate-groupby yield(key,)與reduce產生(key,aggregate,例如之和)之間的差異。因此,從一個到另一個重新編寫就意味着瞭解如何對數據進行單次傳遞(聚合器)功能。請注意,我並沒有打算查看你的「測試功能」。 –
Chinny84
@ Chinny84對不起,我錯過了之前需要的輸出格式。有沒有可能引導我採用其他方法? – Harish