2016-04-23 26 views
3

我有一個數據幀,看起來像這樣如何創建火花SQL z評分爲每個組

 dSc  TranAmount 
1: 100021  79.64 
2: 100021  79.64 
3: 100021  0.16 
4: 100022  11.65 
5: 100022  0.36 
6: 100022  0.47 
7: 100025  0.17 
8: 100037  0.27 
9: 100056  0.27 
10: 100063  0.13 
11: 100079  0.13 
12: 100091  0.15 
13: 100101  0.22 
14: 100108  0.14 
15: 100109  0.04 

現在我想創建一個z值每TranAmount的第三列,這將是

(TranAmount-mean(TranAmount))/StdDev(TranAmount) 

這裏均值和標準差將在火花SQL根據每個DSC

現在我可以計算出平均值和STDEV組

datafromdb.groupBy("dSc").agg(datafromdb.dSc,func.avg("TranAmount"),func.stddev_pop("TranAmount")) 

但我就如何實現與數據幀。我的Z得分第三列虧損將不勝感激任何指針實現這一

回答

5

例如,你可以計算統計數據的正確和join與原始數據:

stats = (df.groupBy("dsc") 
    .agg(
     func.stddev_pop("TranAmount").alias("sd"), 
     func.avg("TranAmount").alias("avg"))) 

df.join(broadcast(stats), ["dsc"]) 

(df 
    .join(func.broadcast(stats), ["dsc"]) 
    .select("dsc", "TranAmount", (df.TranAmount - stats.avg)/stats.sd)) 

或使用窗口功能with standard deviation formula

from pyspark.sql.window import Window 
import sys 

def z_score_w(col, w): 
    avg_ = func.avg(col).over(w) 
    avg_sq = func.avg(col * col).over(w) 
    sd_ = func.sqrt(avg_sq - avg_ * avg_) 
    return (col - avg_)/sd_ 

w = Window().partitionBy("dsc").rowsBetween(-sys.maxsize, sys.maxsize) 
df.withColumn("zscore", z_score_w(df.TranAmount, w)) 
+0

我不力完全得到了'rowsBetween(-sys.max大小,了sys.maxsize)'部分 – Bg1850

+2

它相當於'BETWEEN UNBOUNDED PRECEDING和無界FOLLOWING'子句ROWS。 – zero323