正如問題所述,我想使用由orElse組成的部分函數作爲火花中的udf。這裏是可以在火花shell中運行一個例子:如何在火花中使用由orElse構成的部分函數作爲udf
val df = sc.parallelize(1 to 15).toDF("num")
df.show
//Testing out a normal udf - this works
val gt5: (Int => String) = num => (num > 5).toString
val gt5Udf = udf(gt5)
df.withColumn("gt5", gt5Udf(col("num"))).show
//Now create a udf of a partial function composed with orElse
val baseline: PartialFunction[Int, String] = { case _ => "baseline" }
val ge3: PartialFunction[Int, String] = { case x if x >= 3 => ">=3" }
val ge7: PartialFunction[Int, String] = { case x if x >= 7 => ">=7" }
val ge12: PartialFunction[Int, String] = { case x if x >= 12 => ">=12" }
val composed: PartialFunction[Int, String] = ge12 orElse ge7 orElse ge3 orElse baseline
val composedUdf = udf(composed)
//This fails (but this is what I'd like to do)
df.withColumn("pf", composedUdf(col("num"))).show
//Use a partial function not composed with orElse - this works
val baselineUdf = udf(baseline)
df.withColumn("pf", baselineUdf(col("num"))).show
我目前有以下配置的三個節點的獨立的集羣上運行以下命令:
- 火花:1.6.0
- HDFS:2.4.1
- 階:2.10.5
我發現了什麼,我認爲是這個答案的線索:Why Scala can serialize Function but not PartialFunction?
所以我嘗試:
scala> composed.isInstanceOf[Serializable]
res: Boolean = false
scala> composedUdf.isInstanceOf[Serializable]
res: Boolean = true
scala> baseline.isInstanceOf[Serializable]
res: Boolean = true
scala> baselineUdf.isInstanceOf[Serializable]
res: Boolean = true
我越來越模糊這裏,但似乎組成部分的功能與否則容易去除的序列化?
我認爲最翔實的錯誤是:
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: scala.PartialFunction$OrElse
...
我該如何解決呢?或者我離開基地?
在此先感謝您的幫助!
有趣的問題,我希望我有一個答案!我不。對你來說可能有用的是Spark中的UDF是Catalyst Optimizer的黑盒子,因爲它們沒有進行優化,所以應該謹慎使用。通常,您可以使用Spark SQL的現有功能獲得更好的結果。另外'(1到15).toDF(「num」)'也可以。 –
呃。雖然'PartialFunction'只是一個特性,'orElse'必須返回一個具體的'PartialFunction' - 所以它會讓人聯想起來,除非該類最終不可序列化(即使沒有很好的理由成爲)。存在你的問題。 – Alec
它應該工作,如果你解除它並將其包裝在另一個函數中,就像'val composed:Int => Option [String] = x =>(ge12或else ge7 orElse ge3 orElse baseline).lift.apply(x)',如果那是你可以生活的東西。 – lpiepiora