-1
我一直在嘗試定義一個函數,它在Spark的DataFrame中工作,它將scala集作爲輸入並輸出一個整數。我收到以下錯誤:Spark sqlContext UDF作用於集
org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 25.0 failed 1 times, most recent failure: Lost task 20.0 in stage 25.0 (TID 473, localhost): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Set
下面是一個簡單的代碼,使該問題的關鍵所在:
// generate sample data
case class Dummy(x:Array[Integer])
val df = sqlContext.createDataFrame(Seq(
Dummy(Array(1,2,3)),
Dummy(Array(10,20,30,40))
))
// define the UDF
import org.apache.spark.sql.functions._
def setSize(A:Set[Integer]):Integer = {
A.size
}
// For some reason I couldn't get it to work without this valued function
val sizeWrap: (Set[Integer] => Integer) = setSize(_)
val sizeUDF = udf(sizeWrap)
// this produces the error
df.withColumn("colSize", sizeUDF('x)).show
缺少什麼我在這裏?我怎樣才能使這個工作?我知道我可以通過投射到RDD來做到這一點,但我不想在RDD和DataFrame之間來回切換。
感謝。有用。我還設法推廣到2套作爲輸入(這是我最初的需要) – Avision