2016-09-20 92 views
1

如何在pyspark中將groupbyKey轉換爲reduceByKey。我附上了一個片段。這將爲每個區域部門周組合應用一個更正。我使用了groupbyKey,但它非常緩慢和Shuffle錯誤(我有10-20GB的數據,每個組將有2-3GB)。請幫我在此重寫使用reduceByKey將groupBYKey轉換爲ReduceByKey Pyspark

數據集

region dept week val1 valu2 
US CS 1  1 2 
US CS 2  1.5 2 
US CS 3  1 2 
US ELE 1  1.1 2 
US ELE 2  2.1 2 
US ELE 3  1 2 
UE CS 1  2 2 

輸出

region dept corr 
US  CS 0.5 
US  ELE 0.6 
UE  CS .3333 

代碼

def testFunction (key, value): 
    for val in value: 
     keysValue = val.asDict().keys() 
     inputpdDF.append(dict([(keyRDD, val[keyRDD]) for keyRDD in keysValue]) 
    pdDF = pd.DataFrame(inputpdDF, columns = keysValue) 
    corr = pearsonr(pdDF['val1'].astype(float), pdDF['val1'].astype(float))[0] 
    corrDict = {"region" : key.region, "dept" : key.dept, "corr": corr}     
    finalRDD.append(Row(**corrDict)) 
    return finalRDD 

resRDD = df.select(["region", "dept", "week", "val1", "val2"])\ 
      .map(lambda r: (Row(region= r.region, dept= r.dept), r))\ 
      .groupByKey()\ 
      .flatMap(lambda KeyValue: testFunction(KeyValue[0], list(KeyValue[1]))) 
+1

reduceByKey在幾種方式上與groupByKey不同,但主要的是aggregate-groupby yield(key,)與reduce產生(key,aggregate,例如之和)之間的差異。因此,從一個到另一個重新編寫就意味着瞭解如何對數據進行單次傳遞(聚合器)功能。請注意,我並沒有打算查看你的「測試功能」。 – Chinny84

+0

@ Chinny84對不起,我錯過了之前需要的輸出格式。有沒有可能引導我採用其他方法? – Harish

回答

0

嘗試:

>>> from pyspark.sql.functions import corr 
>>> df.groupBy("region", "dept").agg(corr("val1", "val2")) 
+0

謝謝,這將工作..我只複製了2列..實際上我的corr計算應該發生在Val1與Val2,val1與val3,val1與val4 ... val1與Valn(第n列)我打算做的像這個aggList = [func.corr(「val1」,col).alias(colname)列col] df.groupBy(「region」,「dept」)。agg(* aggList)....我覺得這應該工作。接下來更大的麻煩是我必須將statsmodels.formula.api.ols()應用於groupByKey方法非常慢的同一組。我們有其他方式嗎?我嘗試了MLLIB,它不會爲我們工作(我需要封閉式解決方案) – Harish

+0

您可以執行多個聚合。我忍不住配方。 – 2016-09-20 22:14:55

相關問題