2016-10-07 26 views
5

正如問題所述,我想使用由orElse組成的部分函數作爲火花中的udf。這裏是可以在火花shell中運行一個例子:如何在火花中使用由orElse構成的部分函數作爲udf

val df = sc.parallelize(1 to 15).toDF("num") 
df.show 

//Testing out a normal udf - this works 
val gt5: (Int => String) = num => (num > 5).toString 
val gt5Udf = udf(gt5) 
df.withColumn("gt5", gt5Udf(col("num"))).show 

//Now create a udf of a partial function composed with orElse 
val baseline: PartialFunction[Int, String] = { case _ => "baseline" } 
val ge3: PartialFunction[Int, String] = { case x if x >= 3 => ">=3" } 
val ge7: PartialFunction[Int, String] = { case x if x >= 7 => ">=7" } 
val ge12: PartialFunction[Int, String] = { case x if x >= 12 => ">=12" } 

val composed: PartialFunction[Int, String] = ge12 orElse ge7 orElse ge3 orElse baseline 
val composedUdf = udf(composed) 

//This fails (but this is what I'd like to do) 
df.withColumn("pf", composedUdf(col("num"))).show 

//Use a partial function not composed with orElse - this works 
val baselineUdf = udf(baseline) 
df.withColumn("pf", baselineUdf(col("num"))).show 

我目前有以下配置的三個節點的獨立的集羣上運行以下命令:

  • 火花:1.6.0
  • HDFS:2.4.1
  • 階:2.10.5

我發現了什麼,我認爲是這個答案的線索:Why Scala can serialize Function but not PartialFunction?

所以我嘗試:

scala> composed.isInstanceOf[Serializable] 
res: Boolean = false 

scala> composedUdf.isInstanceOf[Serializable] 
res: Boolean = true 

scala> baseline.isInstanceOf[Serializable] 
res: Boolean = true 

scala> baselineUdf.isInstanceOf[Serializable] 
res: Boolean = true 

我越來越模糊這裏,但似乎組成部分的功能與否則容易去除的序列化?

我認爲最翔實的錯誤是:

org.apache.spark.SparkException: Task not serializable 
... 
Caused by: java.io.NotSerializableException: scala.PartialFunction$OrElse 
... 

我該如何解決呢?或者我離開基地?

在此先感謝您的幫助!

+2

有趣的問題,我希望我有一個答案!我不。對你來說可能有用的是Spark中的UDF是Catalyst Optimizer的黑盒子,因爲它們沒有進行優化,所以應該謹慎使用。通常,您可以使用Spark SQL的現有功能獲得更好的結果。另外'(1到15).toDF(「num」)'也可以。 –

+1

呃。雖然'PartialFunction'只是一個特性,'orElse'必須返回一個具體的'PartialFunction' - 所以它會讓人聯想起來,除非該類最終不可序列化(即使沒有很好的理由成爲)。存在你的問題。 – Alec

+1

它應該工作,如果你解除它並將其包裝在另一個函數中,就像'val composed:Int => Option [String] = x =>(ge12或else ge7 orElse ge3 orElse baseline).lift.apply(x)',如果那是你可以生活的東西。 – lpiepiora

回答

3

它應該工作,如果你提起它幷包裹在另一個功能。

1

雖然這並不直接解決您的問題,我想建議和使用SQL函數的替代解決方案。

首先,您必須導入所需的功能:

import org.apache.spark.sql.functions.{when, lit} 

有的implicits爲簡潔:

import sqlContext.implicits._ 

接下來,您可以表達相同的條件代碼:

val baseline = lit("baseline") 
val ge3 = when($"num" >= 3, ">=3") 
val ge7 = when($"num" >= 7, ">=7") 
val ge12 = when($"num" >= 12, ">=12") 

val composed = ge12 otherwise (ge7 otherwise (ge3 otherwise baseline)) 

在這種形式中,它有點不那麼優雅,但你可以毫不費力地組成expr這樣使用標準收集API(foldLeft/foldRight)和unlike UDFs,可以通過Catalyst Optimizer優化結果。