在您的例子有與不能用於行和整個數據幀UDF功能的問題。 UDF只能應用於單行,但Spark也支持在整個DataFrame上實現UDAF(用戶定義的聚合函數)。
解決你的問題,你可以使用下面的功能:
from pyspark.sql.functions import mean
def normalize(df, column):
average = df.agg(mean(df[column]).alias("mean")).collect()[0]["mean"]
return df.select(df[column] - average)
使用方法如下:
normalize(df, "Fare")
請注意,以上僅適用於單個列,但它是可以實現的東西更通用:
def normalize(df, columns):
selectExpr = []
for column in columns:
average = df.agg(mean(df[column]).alias("mean")).collect()[0]["mean"]
selectExpr.append(df[column] - average)
return df.select(selectExpr)
使用它像:
normalize(df, ["col1", "col2"])
這工作,但你需要爲每列運行聚集,所以有很多列的行爲可能是問題,但有可能只生成一個聚集表達式:
def normalize(df, columns):
aggExpr = []
for column in columns:
aggExpr.append(mean(df[column]).alias(column))
averages = df.agg(*aggExpr).collect()[0]
selectExpr = []
for column in columns:
selectExpr.append(df[column] - averages[column])
return df.select(selectExpr)
這是偉大的工作,彼得·!非常感謝你。 – Balla13