2017-06-16 68 views
0

我試圖計算數據框中每列的平均值,並從列中的每個元素中減去。我創建了一個函數來嘗試這樣做,但是當我嘗試使用UDF實現它時,出現錯誤:'float'對象沒有屬性'map'。關於如何創建這樣的功能的任何想法?謝謝!從pyspark數據框中減去平均值

def normalize(data): 
     average=data.map(lambda x: x[0]).sum()/data.count() 
     out=data.map(lambda x: (x-average)) 
     return out 

mapSTD=udf(normalize,IntegerType())  
dats = data.withColumn('Normalized', mapSTD('Fare')) 

回答

0

在您的例子有與不能用於行和整個數據幀UDF功能的問題。 UDF只能應用於單行,但Spark也支持在整個DataFrame上實現UDAF(用戶定義的聚合函數)。

解決你的問題,你可以使用下面的功能:

from pyspark.sql.functions import mean 

def normalize(df, column): 
    average = df.agg(mean(df[column]).alias("mean")).collect()[0]["mean"] 
    return df.select(df[column] - average) 

使用方法如下:

normalize(df, "Fare") 

請注意,以上僅適用於單個列,但它是可以實現的東西更通用:

def normalize(df, columns): 
    selectExpr = [] 
    for column in columns: 
     average = df.agg(mean(df[column]).alias("mean")).collect()[0]["mean"] 
     selectExpr.append(df[column] - average) 
    return df.select(selectExpr) 

使用它像:

normalize(df, ["col1", "col2"]) 

這工作,但你需要爲每列運行聚集,所以有很多列的行爲可能是問題,但有可能只生成一個聚集表達式:

def normalize(df, columns): 
    aggExpr = [] 
    for column in columns: 
     aggExpr.append(mean(df[column]).alias(column)) 
    averages = df.agg(*aggExpr).collect()[0] 
    selectExpr = [] 
    for column in columns: 
     selectExpr.append(df[column] - averages[column]) 
    return df.select(selectExpr) 
+0

這是偉大的工作,彼得·!非常感謝你。 – Balla13

相關問題