2017-06-19 36 views
0

我想創建我自己的方差函數來計算DataFrame groupBy聚合步驟中的方差。我不能使用functions.variance()方法,因爲輸入的大小存儲在一列中。我想找到一個列的「方差百分比」,其中項目的總數存儲在名爲「NumberOfItems」的列中。如何創建自定義groupBy聚合器?

我很困惑如何創建我自己的函數返回一個列類型爲.agg()方法需要與列返回類型的函數。

這裏是我所期待的

myDF.groupBy(col("Store"), col("week")).agg(sum(col("PercentEaten")).divide(col("NumberOfItems")).as("MeanPercentEaten"), myVariance(col("PercentEaten"), col("NumberOfItems")).as("VariancePercentEaten"); 

一個例子,我如何去有關定義myVariance()方法只是不確定。這也是我第一次使用Spark,所以我的編碼風格可能不是最好的。

回答

0

我只是不確定如何去定義myVariance()方法。

這是一個用戶定義集合函數(又名UDAF)的一個例子。

要創建一個你必須實現org.apache.spark.sql.expressions.UserDefinedAggregateFunction

實現用戶自定義的聚合函數(UDAF)的基類。

之後,您應該創建自定義UDAF的一個實例,並使用applydistinct方法來使用它。

申請(列... exprs)創建使用給定的列作爲輸入參數此UDAF一列。

distinct(Column ... exprs)使用給定Columns的不同值作爲輸入參數爲此UDAF創建一個列。


(我不會提出任何代碼Java是不是我的語言火花)。

+0

UDAF是否適用於pyspark?似乎只有df.groupBy()。agg() –

+0

我對pyspark一無所知。爲什麼'df.groupBy()。agg()'使UDF在pyspark上不起作用? –

+0

我可以使用另一個聚合器中的一個聚合器的列嗎?例如df.groupBy()。agg(mean(col()。as(「Mean」),variance(col(),col(「Mean」));所以我們是在聚合過程中在方差聚合器中創建的平均值列? – mjsee