2017-08-16 53 views
-1

UDF模式匹配下面給出df火花多列和集合元素

enter image description here

val df = spark.createDataFrame(Seq(
(1, 2, 3), 
(3, 2, 1) 
)).toDF("One", "Two", "Three") 

與架構: enter image description here

我想編寫一個udf是需要Three columns作爲進出;並返回基於新列的最高輸入值類似如下:

import org.apache.spark.sql.functions.udf 


def udfScoreToCategory=udf((One: Int, Two: Int, Three: Int): Int => { 
    cols match { 
    case cols if One > Two && One > Three => 1 
    case cols if Two > One && Two > Three => 2 
    case _ => 0 
}} 

這將是有趣的,看看如何與vector type作爲輸入做類似:

import org.apache.spark.ml.linalg.Vector 

def udfVectorToCategory=udf((cols:org.apache.spark.ml.linalg.Vector): Int => { 
    cols match { 
    case cols if cols(0) > cols(1) && cols(0) > cols(2) => 1, 
    case cols if cols(1) > cols(0) && cols(1) > cols(2) => 2 
    case _ => 0 
}}) 
+0

問題是如何將多列傳遞給'udf'並根據'invalid syntax'示例執行模式匹配 –

回答

1

的一些問題:

  • 在第一示例cols不在範圍。
  • (...): T => ...對於匿名函數而言是無效的語法。
  • 這裏比在def更好使用val。來定義這個

方式一:

val udfScoreToCategory = udf[Int, (Int, Int, Int)]{ 
    case (one, two, three) if one > two && one > three => 1 
    case (one, two, three) if two > one && two > three => 2 
    case _ => 0 
} 

val udfVectorToCategory = udf[Int, org.apache.spark.ml.linalg.Vector]{ 
    _.toArray match { 
    case Array(one, two, three) if one > two && one > three => 1 
    case Array(one, two, three) if two > one && two > three => 2 
    case _ => 0 
}} 

在一般情況下,你應該使用``第一種情況下when`

import org.apache.spark.sql.functions.when 

when ($"one" > $"two" && $"one" > $"three", 1) 
    .when ($"two" > $"one" && $"two" > $"three", 2) 
    .otherwise(0) 

其中onetwothree是列名。

1

我能找到的最大的元素通過載體的:

val vectorToCluster = udf{ (x: Vector) => x.argmax } 

不過,我仍然疑惑如何做模式匹配的多個列中的值。