2015-08-26 72 views
2

我想要一個Spark Sql的concat函數。 我已經寫了UDF作爲Spark Sql udf可變參數數

sqlContext.udf.register("CONCAT",(args:String*)=>{ 
String out="" 
for(arg<-args) 
    { 
    out+=arg 
    } 
out 
}) 

sqlContext.sql("select col1,col2,CONCAT(col1,col2) from testtable") 

但UDF不工作,我得到一個例外。 如果我嘗試使用固定數量的參數,那麼它工作。 我正在使用spark 1.3.1和scala 2.10.5。

有沒有人遇到這個問題或知道這個解決方案?

+0

您能否提供問題中的例外情況? – ColinMc

回答

2

如果您只想使用原始SQL連接列,則根本不需要自定義UDF。 CONCAT功能已經存在:

val df = sc.parallelize(List(("a", "b", "c"))).toDF("x", "y", "z") 
df.registerTempTable("df") 
sqlContext.sql("SELECT CONCAT(x, y, z) AS xyz FROM df").show 

// +---+ 
// |xyz| 
// +---+ 
// |abc| 
// +---+ 

由於1.5.0您可以直接使用concat/concat_ws功能:

import org.apache.spark.sql.functions.{concat, concat_ws} 

df.select(concat_ws("-", $"x", $"y", $"z").alias("x-y-z")).show 
// +-----+ 
// |x-y-z| 
// +-----+ 
// |a-b-c| 
// +-----+ 

df.select(concat($"x", $"y", $"z").alias("xyz")).show 

// +---+ 
// |xyz| 
// +---+ 
// |abc| 
// +---+ 

參見Spark UDF with varargs

2

您可以使用struct功能像下面這樣做:

val myUDF = udf { 
    (r: Row) => r.toSeq.map(...) // the "r" row contains your arguments 
} 
val df = .... 
df.select(col("col1"), myUDF(struct(col("col2"), col("col3"), col("col4"), ...)))