2017-02-26 25 views
0

我猴子修補了org.apache.spark.sql.Column類以添加chainUDF方法。它適用於不需要參數的udfs,我需要幫助才能使udfs具有通用的參數。使用callUDF創建鏈接UDF調用的方法

下面是當前的chainUDF方法定義。

object ColumnExt { 

    implicit class ColumnMethods(c: Column) { 

    def chainUDF(udfName: String): Column = { 
     callUDF(udfName, c) 
    } 

    } 

} 

這裏是chainUDF方法的實際應用。

def appendZ(s: String): String = { 
    s"${s}Z" 
} 

spark.udf.register("appendZUdf", appendZ _) 

def prependA(s: String): String = { 
    s"A${s}" 
} 

spark.udf.register("prependAUdf", prependA _) 

val hobbiesDf = Seq(
    ("dance"), 
    ("sing") 
).toDF("word") 

val actualDf = hobbiesDf.withColumn(
    "fun", 
    col("word").chainUDF("appendZUdf").chainUDF("prependAUdf") 
) 

我想更新chainUDF方法定義所以它需要的Column可選參數列表。事情是這樣的:

def appendWord(s: String, word: String): String = { 
    s"${s}${word}" 
} 

spark.udf.register("appendWordUdf", appendWord _) 

val hobbiesDf = Seq(
    ("dance"), 
    ("sing") 
).toDF("word") 

val actualDf = hobbiesDf.withColumn(
    "fun", 
    col("word").chainUDF("appendZUdf").chainUDF("appendWordUdf", lit("cool")) 
) 

我認爲我們將需要在chainUDF方法定義更新到這樣的事情:

object ColumnExt { 

    implicit class ColumnMethods(c: Column) { 

    def chainUDF(udfName: String, cols: Column* = some_default_value): Column = { 
     callUDF(udfName, c + cols) 
    } 

    } 

} 

我敢肯定有一些斯卡拉魔術做到這一點。

回答

1

的簽名是:

def callUDF(udfName: String, cols: Column*): Column 

,所以你不需要魔法:如果`= some_default_value`被刪除

def chainUDF(udfName: String, cols: Column* = some_default_value): Column = { 
    callUDF(udfName, c +: cols: _*) 
} 
+0

你的回答工作。如果你可以在'c +:cols:_ *)'的基礎上增加一些描述,我會很棒。謝謝! – Powers