2017-06-05 100 views
1

RDD減這麼想的工作,我想這個簡單的例子用戶定義類型

scala> rdd2.collect 
    res45: Array[Person] = Array(Person(Mary,28,New York), Person(Bill,17,Philadelphia), Person(Craig,34,Philadelphia), Person(Leah,26,Rochester)) 

    scala> rdd3.collect 
    res44: Array[Person] = Array(Person(Mary,28,New York), Person(Bill,17,Philadelphia), Person(Craig,35,Philadelphia), Person(Leah,26,Rochester)) 

    scala> rdd2.subtract(rdd3).collect 
    res46: Array[Person] = Array(Person(Mary,28,New York), Person(Leah,26,Rochester), Person(Bill,17,Philadelphia), Person(Craig,34,Philadelphia)) 

我希望rdd2.subtract(rdd3).collect只應該是Person(Craig,34,Philadelphia)但我得到RDD2作爲我的輸出任何人都可以請解釋一下嗎?

回答

0

這是Scala的REPL其中平等條件不REPL中正常工作的已知問題之一。嘗試以下來解決它。此問題僅在REPL中發生,並且在您通過spark-submit運行應用程序時會消失。 此問題在此ticket中有詳細說明。

scala> :paste -raw // make sure you are using Scala 2.11 for the raw option to work. 
// Entering paste mode (ctrl-D to finish) 

package mytest; 
case class Person(name: String, age: Int, city: String); 

// Exiting paste mode, now interpreting. 

scala> import mytest.Person 

scala> val rdd2 = sc.parallelize(Seq(Person("Mary",28,"New York"), Person("Bill",17,"Philadelphia"), Person("Craig",34,"Philadelphia"), Person("Leah",26,"Rochester"))) 
rdd2: org.apache.spark.rdd.RDD[mytest.Person] = ParallelCollectionRDD[6] at parallelize at <console>:25 


scala> val rdd3 = sc.parallelize(Seq(Person("Mary",28,"New York"), Person("Bill",17,"Philadelphia"), Person("Craig",35,"Philadelphia"), Person("Leah",26,"Rochester"))) 
rdd3: org.apache.spark.rdd.RDD[mytest.Person] = ParallelCollectionRDD[7] at parallelize at <console>:25 

scala> rdd2.subtract(rdd3).collect 
res1: Array[mytest.Person] = Array(Person(Craig,34,Philadelphia)) 
+0

謝謝你的回答,它在我的情況下效果很好。但如何使用粘貼模式可以解決問題? –

+0

解決方案需要將案例類置於一個包中(本例中爲mytest)。要做到這一點,我們需要粘貼模式。與原始選項。 –

-1

這裏是我在Spark 2.0.2中測試並按預期工作的示例,我認爲它應該適用於您的情況。試試這個例子,嘗試在本地而不是在REPL上運行。

case class Person(name: String, age: Int, address: String) 

    val spark = 
     SparkSession.builder().master("local").appName("test").getOrCreate() 

    val df1 = spark.sparkContext.parallelize(Array(
     Person("Mary",28,"New York"), 
     Person("Bill",17,"Philadelphia"), 
     Person("Craig",34,"Philadelphia"), 
     Person("Leah",26,"Rochester"))) 

    val df2 = spark.sparkContext.parallelize(Array(
     Person("Mary",28,"New York"), 
     Person("Bill",17,"Philadelphia"), 
     Person("Craig",35,"Philadelphia"), 
     Person("Leah",26,"Rochester") 
    )) 

    df1.subtract(df2).collect.foreach(println) 

輸出:

Person(Craig,34,Philadelphia) 
+0

謝謝你的回答我得到了點 –