2017-02-07 71 views
1

我有這樣的自定義斯卡拉對象(基本上是一個Java POJO):過濾RDD與CustomObject,類型不匹配

object CustomObject { 

    implicit object Mapper extends JavaBeanColumnMapper[CustomObject] 

} 


class CustomObject extends Serializable { 


    @BeanProperty 
    var amount: Option[java.lang.Double] = _ 

    ... 
} 

在我的主類,我已經加載包含這些CustomObjects的RDD。 我試圖對其進行過濾,並創建一個只包含有量> 5000

val customObjectRDD = sc.objectFile[CustomObject]("objectFiles") 
val filteredRdd = customObjectRDD.filter(x => x.amount > 5000) 
println(filteredRdd.count()) 

然而,我的編輯說

類型不匹配的對象的新RDD:預期:(CustomObject)= >布爾值,實際值: (CustomObject)=>任何

我該怎麼做才能使其工作?

回答

3

>運營商未在Option[Double]定義,你的過濾謂詞將需要處理的Option

scala> case class A(amount: Option[Double]) 
defined class A 

scala> val myRDD = sc.parallelize(Seq(A(Some(10000d)), A(None), A(Some(5001d)), A(Some(5000d)))) 
myRDD: org.apache.spark.rdd.RDD[A] = ParallelCollectionRDD[12] at parallelize at <console>:29 

scala> myRDD.filter(_.amount.exists(_ > 5000)).foreach{println} 
A(Some(10000.0)) 
A(Some(5001.0)) 

這是假設與amount = None任何對象應該失敗過濾器謂詞。有關Option.exists的定義,請參見the docs