2016-04-28 78 views
3

在我的項目,我想實現ADD(+)函數,但我也許參數LongTypeDoubleTypeIntType。我使用sqlContext.udf.register("add",XXX),但我不知道如何編寫XXX,這是製作泛型函數。在Spark SQL中,您如何註冊和使用通用UDF?

+0

我很好奇 - 你爲什麼要編寫自己的'+'實現?已經有一個加號功能,例如'df.select(COL( 「一」)+山口( 「B」))' –

+0

對不起,我的意思是,例如,列的參數( 「A」)是IntType上,列的參數(「B 「)是LongType,col(」c「)的參數是DoubleType,現在我想實現Add 1,對於任何人,我都可以編寫sqlContext.udf.register(」add「,(x:Int或Double或Long)= > X + 1),但我不知道如何使用一個funtion解決所有的通用functions.Can你幫我,謝謝 – yjxyjx

+0

好了,這也可以用沒有新的UDF完成:'df.select(COL( 「一」)+亮(1))'會爲任何數值類型列'了'的工作:)但我明白您的實際問題是關於類型的問題,以及如何實現一些具體的事情(是嗎?) –

回答

0

我不認爲你可以註冊一個通用的UDF。

如果我們看一看在register方法 的signature(實際上,這只是22個register重載之一,用於UDF的與一個說法,別人是等價的):

def register[RT: TypeTag, A1: TypeTag](name: String, func: Function1[A1, RT]): UserDefinedFunction 

我們可以看到,它的參數與A1: TypeTag類型 - TypeTag意味着在註冊時,我們必須有實際的類型UDF的論點的證據。因此 - 傳遞一個通用函數func而不顯式鍵入它不能編譯。

對於你的情況,你也許可以利用星火對自動轉換數字類型的能力 - 寫僅Double個UDF,你也可以將它應用於Int S(輸出將是Double,雖然) :

sqlContext.udf.register("add", (i: Double) => i + 1) 

// creating a table with Double and Int types: 
sqlContext.createDataFrame(Seq((1.5, 4), (2.2, 5))).registerTempTable("table1") 

// applying UDF to both types: 
sqlContext.sql("SELECT add(_1), add(_2) FROM table1").show() 

// output: 
// +---+---+ 
// |_c0|_c1| 
// +---+---+ 
// |2.5|5.0| 
// |3.2|6.0| 
// +---+---+ 
+0

謝謝,但是如果你的類型是LongType,那是錯誤的,說long不是強制轉換爲double。現在,我使用java.lang.Number,Double/Long/Int的超類實現它。 – yjxyjx

4

您可以通過創建一個StructTypestruct($"col1", $"col2")保存你的價值觀和你的UDF工作過的這個創建一個通用UDF。它被傳遞到您的UDF作爲對象,所以你可以做這樣的事情:

val multiAdd = udf[Double,Row](r => { 
    var n = 0.0 
    r.toSeq.foreach(n1 => n = n + (n1 match { 
    case l: Long => l.toDouble 
    case i: Int => i.toDouble 
    case d: Double => d 
    case f: Float => f.toDouble 
    })) 
    n 
}) 

val df = Seq((1.0,2),(3.0,4)).toDF("c1","c2") 
df.withColumn("add", multiAdd(struct($"c1", $"c2"))).show 
+---+---+---+ 
| c1| c2|add| 
+---+---+---+ 
|1.0| 2|3.0| 
|3.0| 4|7.0| 
+---+---+---+ 

你甚至可以做一些有趣的事情像帶可變數量的輸入列的。事實上,我們的UDF上面已經定義並說:

val df = Seq((1, 2L, 3.0f,4.0),(5, 6L, 7.0f,8.0)).toDF("int","long","float","double") 

df.printSchema 
root 
|-- int: integer (nullable = false) 
|-- long: long (nullable = false) 
|-- float: float (nullable = false) 
|-- double: double (nullable = false) 

df.withColumn("add", multiAdd(struct($"int", $"long", $"float", $"double"))).show 
+---+----+-----+------+----+ 
|int|long|float|double| add| 
+---+----+-----+------+----+ 
| 1| 2| 3.0| 4.0|10.0| 
| 5| 6| 7.0| 8.0|26.0| 
+---+----+-----+------+----+ 

您甚至可以添加一個硬編碼數字混進去:

df.withColumn("add", multiAdd(struct(lit(100), $"int", $"long"))).show 
+---+----+-----+------+-----+ 
|int|long|float|double| add| 
+---+----+-----+------+-----+ 
| 1| 2| 3.0| 4.0|103.0| 
| 5| 6| 7.0| 8.0|111.0| 
+---+----+-----+------+-----+ 

如果你想使用SQL語法UDF,你可以這樣做:

sqlContext.udf.register("multiAdd", (r: Row) => { 
    var n = 0.0 
    r.toSeq.foreach(n1 => n = n + (n1 match { 
    case l: Long => l.toDouble 
    case i: Int => i.toDouble 
    case d: Double => d 
    case f: Float => f.toDouble 
    })) 
    n 
}) 
df.registerTempTable("df") 

// Note that 'int' and 'long' are column names 
sqlContext.sql("SELECT *, multiAdd(struct(int, long)) as add from df").show 
+---+----+-----+------+----+ 
|int|long|float|double| add| 
+---+----+-----+------+----+ 
| 1| 2| 3.0| 4.0| 3.0| 
| 5| 6| 7.0| 8.0|11.0| 
+---+----+-----+------+----+ 

這工作太:

sqlContext.sql("SELECT *, multiAdd(struct(*)) as add from df").show 
+---+----+-----+------+----+ 
|int|long|float|double| add| 
+---+----+-----+------+----+ 
| 1| 2| 3.0| 4.0|10.0| 
| 5| 6| 7.0| 8.0|26.0| 
+---+----+-----+------+----+