0
我猴子修補了org.apache.spark.sql.Column
類以添加chainUDF
方法。它適用於不需要參數的udfs,我需要幫助才能使udfs具有通用的參數。使用callUDF創建鏈接UDF調用的方法
下面是當前的chainUDF
方法定義。
object ColumnExt {
implicit class ColumnMethods(c: Column) {
def chainUDF(udfName: String): Column = {
callUDF(udfName, c)
}
}
}
這裏是chainUDF
方法的實際應用。
def appendZ(s: String): String = {
s"${s}Z"
}
spark.udf.register("appendZUdf", appendZ _)
def prependA(s: String): String = {
s"A${s}"
}
spark.udf.register("prependAUdf", prependA _)
val hobbiesDf = Seq(
("dance"),
("sing")
).toDF("word")
val actualDf = hobbiesDf.withColumn(
"fun",
col("word").chainUDF("appendZUdf").chainUDF("prependAUdf")
)
我想更新chainUDF
方法定義所以它需要的Column
可選參數列表。事情是這樣的:
def appendWord(s: String, word: String): String = {
s"${s}${word}"
}
spark.udf.register("appendWordUdf", appendWord _)
val hobbiesDf = Seq(
("dance"),
("sing")
).toDF("word")
val actualDf = hobbiesDf.withColumn(
"fun",
col("word").chainUDF("appendZUdf").chainUDF("appendWordUdf", lit("cool"))
)
我認爲我們將需要在chainUDF
方法定義更新到這樣的事情:
object ColumnExt {
implicit class ColumnMethods(c: Column) {
def chainUDF(udfName: String, cols: Column* = some_default_value): Column = {
callUDF(udfName, c + cols)
}
}
}
我敢肯定有一些斯卡拉魔術做到這一點。
你的回答工作。如果你可以在'c +:cols:_ *)'的基礎上增加一些描述,我會很棒。謝謝! – Powers