0

比方說,我有我的DataFrame,給定的列名爲「X」。我想了解爲什麼第一個代碼不起作用,而第二個代碼卻不行。對我而言,它不會改變任何東西。Spark UDF不工作:如何指定要應用它的列?

,一方面,這不起作用:

val dataDF = sqlContext 
     .read 
     .parquet(input_data) 
     .select(
      "XXX", "YYY", "III" 
    ) 
     .toDF(
      "X", "Y", "I" 
    ) 
     .groupBy(
      "X", "Y" 
    ) 
     .agg(
      sum("I").as("sum_I") 
    ) 
     .orderBy(desc("sum_I")) 
     .withColumn("f_sum_I", udf((x: Long) => f(x)).apply(dataDF("sum_I"))) 
     .drop("sum_I") 

dataDF.show(50, false) 

的IntelliJ不編譯我的代碼,我有以下錯誤:

Error:(88, 67) recursive value dataDF needs type 
     .withColumn("f_sum_I", udf((x: Long) => f(x)).apply(dataDF("sum_I"))) 

在另一方面,這一工作如果我改變給定的行:

.withColumn("f_sum_I", udf((x: Long) => f(x)).apply(col("sum_I"))) 

我所做的只是將呼叫替換爲我的DataFrame列以使用更通用的功能「col」。我不明白它們之間的區別,特別是爲什麼它不喜歡第一種方法(使用DataFrame的名稱)。

回答

2

您正在嘗試使用dataDF大功告成定義它 - dataDF是整個表達式開始sqlContext.read.drop("sumI")結束的結果,所以你不能在表達式中使用它。

您可以通過簡單地引用列而不使用DataFrame來解決這個問題,例如,使用從org.apache.spark.sql.functions功能col

.withColumn("f_sum_I", udf((x: Long) => f(x)).apply(col("sum_I")))