2014-07-22 33 views
8

這裏是我的代碼示例:Apache Spark:截然不同的工作?

case class Person(name:String,tel:String){ 
     def equals(that:Person):Boolean = that.name == this.name && this.tel == that.tel} 

val persons = Array(Person("peter","139"),Person("peter","139"),Person("john","111")) 
sc.parallelize(persons).distinct.collect 

它返回

res34: Array[Person] = Array(Person(john,111), Person(peter,139), Person(peter,139)) 

爲什麼不同不工作我想要的結果是人( 「約翰福音」,111),人( 「彼得」? ,139)

+1

我不知道是否有事情做了 「Peter」 不是一樣的「 perter「? – kviiri

+0

在發佈之前,您花了多少時間來查看問題?作爲這個測試的結果你期望什麼? – maasg

+3

標記爲關閉,因爲這似乎是由簡單的印刷錯誤造成的。 – kviiri

回答

0

正如其他人指出的那樣,這是Spark 1.0.0中的一個錯誤。我到哪裏它是來自理論是,如果你看看1.0.0的DIFF 9.0你看到

- def repartition(numPartitions: Int): RDD[T] = { 
+ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = { 

如果您運行

case class A(i:Int) 
implicitly[Ordering[A]] 

你得到一個錯誤

<console>:13: error: No implicit Ordering defined for A. 
       implicitly[Ordering[A]] 

所以我認爲解決方法是定義一個案例類的隱式排序,不幸的是我不是一個scala專家,但是這個answer seems to do it correctly

+0

在spark-shell 1.0.1中,Person擴展Ordered [Person]不起作用... – edwardsbean

+0

@MrQuestion誠實地說,這只是一個猜測,我不完全確定隱式解決方案在scala中的工作方式 – aaronman

2

從@aaronman的觀察延伸出來,這個問題有一個解決方法。 在RDD,這裏有兩種定義爲distinct

/** 
    * Return a new RDD containing the distinct elements in this RDD. 
    */ 
    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = 
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) 

    /** 
    * Return a new RDD containing the distinct elements in this RDD. 
    */ 
    def distinct(): RDD[T] = distinct(partitions.size) 

,從第一個distinct的簽名是明顯的,必須有元素的隱式順序和它的假設空,如果不存在,這是什麼短版.distinct()呢。

有用於case類沒有默認隱含的排序,但它很容易實現一個:

case class Person(name:String,tel:String) extends Ordered[Person] { 
    def compare(that: Person): Int = this.name compare that.name 
} 

現在,嘗試相同的例子提供了預期的效果(注意,我比較名稱):

val ps5 = Array(Person("peter","138"),Person("peter","55"),Person("john","138")) 
sc.parallelize(ps5).distinct.collect 

res: Array[P5] = Array(P5(john,111), P5(peter,139)) 

請注意,案例類已經實現了equalshashCode,因此提供的示例中的impl是不必要的,也是不正確的。 equals的正確簽名是:equals(arg0: Any): Boolean - 順便說一句,我首先想到這個問題必須處理不正確的等號簽名,這使我看到了錯誤的路徑。

+0

在spark-shell 1.0中,我遵循指導的每一步,但是返回'res2:Array [Person] = Array(Person(john, 138),Person(彼得,138),Person(彼得,55))' – edwardsbean

+0

@MrQuestion你使用了什麼精確版本的Spark?無論你使用什麼,我都會再試一次。 – maasg

+0

我用spark 1.0.0和1.0.1試過了,它們都不起作用 – edwardsbean

1

對我來說,問題與對象相等有關,正如Martin Odersky在Programming in Scala(第30章)中提到的,儘管我有一個普通的類(不是case類)。對於正確的相等性測試,如果您有自定義的equals(),則必須重新定義(覆蓋)hashCode()。你也需要有一個canEqual()方法來保證100%的正確性。我沒有看過RDD的實現細節,但由於它是一個集合,可能它會使用HashSet或其他基於散列的數據結構的一些複雜/並行變體來比較對象並確保獨立性。

聲明的HashSet(),equals()方法,canEqual(),和比較()方法解決我的問題:

override def hashCode(): Int = { 
    41 * (41 + name.hashCode) + tel.hashCode 
} 

override def equals(other: Any) = other match { 
    case other: Person => 
    (other canEqual this) && 
    (this.name == other.name) && (this.tel == other.tel) 
    case _ => 
    false 
} 

def canEqual(other: Any) = other.isInstanceOf[Person] 

def compare(that: Person): Int = { 
    this.name compare that.name 
}