2016-09-01 95 views
-1

我一直在嘗試定義一個函數,它在Spark的DataFrame中工作,它將scala集作爲輸入並輸出一個整數。我收到以下錯誤:Spark sqlContext UDF作用於集

org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 25.0 failed 1 times, most recent failure: Lost task 20.0 in stage 25.0 (TID 473, localhost): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Set 

下面是一個簡單的代碼,使該問題的關鍵所在:

// generate sample data 
case class Dummy(x:Array[Integer]) 
val df = sqlContext.createDataFrame(Seq(
    Dummy(Array(1,2,3)), 
    Dummy(Array(10,20,30,40)) 
)) 

// define the UDF 
import org.apache.spark.sql.functions._ 
def setSize(A:Set[Integer]):Integer = { 
    A.size 
} 
// For some reason I couldn't get it to work without this valued function 
val sizeWrap: (Set[Integer] => Integer) = setSize(_) 
val sizeUDF = udf(sizeWrap) 

// this produces the error 
df.withColumn("colSize", sizeUDF('x)).show 

缺少什麼我在這裏?我怎樣才能使這個工作?我知道我可以通過投射到RDD來做到這一點,但我不想在RDD和DataFrame之間來回切換。

回答

1

使用Seq

val sizeUDF = udf((x: Seq) => setSize(x.toSet)) 
+0

感謝。有用。我還設法推廣到2套作爲輸入(這是我最初的需要) – Avision

相關問題