2017-02-08 72 views
0

我有幾個UDF,我想隨同數據框一起作爲函數參數傳遞。Spark UDF作爲函數參數,UDF不在函數範圍內

這樣做的一種方式可能是在函數中創建UDF,但是這會創建並銷燬UDF的多個實例而不重用它,這可能不是解決此問題的最佳方法。

這裏的代碼樣片 -

val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0} 

val df = inputDF1 
    .withColumn("new_col", lkpUDF(col("c1"))) 
val df2 = inputDF2. 
    .withColumn("new_col", lkpUDF(col("c1"))) 

而不是做上述情況,我非常想要做這樣的事情 -

val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0} 

def appendCols(df: DataFrame, lkpUDF: ?): DataFrame = { 

    df 
     .withColumn("new_col", lkpUDF(col("c1"))) 

    } 
val df = appendCols(inputDF, lkpUDF) 

以上UDF是非常簡單的,但在我的情況下,它可以返回一個原始類型或用戶定義的案例類類型。任何想法/指針將不勝感激。謝謝。

回答

3

您與適當的簽名功能需要是這樣的:

import org.apache.spark.sql.UserDefinedFunction 

def appendCols(df: DataFrame, func: UserDefinedFunction): DataFrame = { 
    df.withColumn("new_col", func(col("col1"))) 
} 

斯卡拉REPL是在返回初始化值的類型非常有用。

scala> val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0} 
lkpUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,List(IntegerType)) 

此外,如果你通入udf包裝函數的簽名由一個Any返回類型(這將是的情況下,如果該函數可以返回一個原語或用戶定義的情況下的類), UDF將無法編譯,例如:

java.lang.UnsupportedOperationException: Schema for type Any is not supported 
+0

謝謝septra。你是對的,我遇到了上述錯誤。但即使在我僅嘗試返回一個案例類時,我似乎也遇到了這個錯誤。 http://stackoverflow.com/questions/42121649/schema-for-type-any-is-not-supported – Yash