2015-12-14 34 views
0

我需要對RDD中的鍵進行排序,但沒有自然排序順序(不是升序或降序)。我甚至不知道如何編寫比較器來做到這一點。假設我有一張蘋果,梨,橘子和葡萄的地圖,我想用橘子,蘋果,葡萄和梨來分類。在RDD中排序鍵

關於如何在Spark/Scala中做到這一點的任何想法?謝謝!

回答

4

在Scala中,您需要查找Ordering[T]特徵而不是Comparator接口 - 這幾乎是一種表面上的區別,因此重點在於數據的屬性,而不是比較兩個數據實例的東西。實現這一特性需要定義方法compare(T,T)。枚舉比較的一個非常明確的版本可能是:

object fruitOrdering extends Ordering[String] { 
    def compare(lhs: String, rhs: String): Int = (lhs, rhs) match { 
    case ("orange", "orange") => 0 
    case ("orange", _)  => -1 
    case ("apple", "orange") => 1 
    case ("apple", "apple") => 0 
    case ("apple", _)   => -1 
    case ("grape", "orange") => 1 
    case ("grape", "apple") => 1 
    case ("grape", "grape") => 0 
    case ("grape", _)   => -1 
    case ("pear", "orange") => 1 
    case ("pear", "apple") => 1 
    case ("pear", "grape") => 1 
    case ("pear", "pear")  => 0 
    case ("pear", _)   => -1 
    case _ => 0 
    } 
} 

或者稍微適應zero323's answer

object fruitOrdering2 extends Ordering[String] { 
    private val values = Seq("orange", "apple", "grape", "pear") 
    // generate the map based off of indices so we don't have to worry about human error during updates 
    private val ordinalMap = values.zipWithIndex.toMap.withDefaultValue(Int.MaxValue) 

    def compare(lhs: String, rhs: String): Int = ordinalMap(lhs).compare(ordinalMap(rhs)) 
} 

現在,你有Ordering[String]一個實例,你需要通知sortBy方法使用而不是內置的。如果你看一下簽名RDD#sortBy你會看到完整的簽名是

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 

,在第二個參數列表隱Ordering[K]編譯器爲預先定義的排序通常擡頭 - 這是它如何知道什麼自然排序應該是。但是,任何隱式參數都可以賦予顯式值。請注意,如果您提供一個隱式值,則需要提供所有值,因此在這種情況下,我們還需要提供ClassTag[K]。這總是由編譯器生成,但可以使用scala.reflect.classTag明確生成。

指定所有這一切,調用看起來像:

import scala.reflect.classTag 
rdd.sortBy { case (key, _) => key }(fruitOrdering, classOf[String]) 

這仍然相當混亂,不過,是不是?幸運的是,我們可以使用隱式類去除很多垃圾。下面是我使用的相當普遍的一個片段:

package com.example.spark 

import scala.reflect.ClassTag 
import org.apache.spark.rdd.RDD 

package object implicits { 
    implicit class RichSortingRDD[A : ClassTag](underlying: RDD[A]) { 
    def sorted(implicit ord: Ordering[A]): RDD[A] = 
     underlying.sortBy(identity)(ord, implicitly[ClassTag[A]]) 

    def sortWith(fn: (A, A) => Int): RDD[A] = { 
     val ord = new Ordering[A] { def compare(lhs: A, rhs: A): Int = fn(lhs, rhs) } 
     sorted(ord) 
    } 
    } 

    implicit class RichSortingPairRDD[K : ClassTag, V](underlying: RDD[(K, V)]) { 
    def sortByKey(implicit ord: Ordering[K]): RDD[(K, V)] = 
     underlying.sortBy { case (key, _) => key } (ord, implicitly[ClassTag[K]]) 

    def sortByKeyWith(fn: (K, K) => Int): RDD[(K, V)] = { 
     val ord = new Ordering[K] { def compare(lhs: K, rhs: K): Int = fn(lhs, rhs) } 
     sortByKey(ord) 
    } 
    } 
} 

而且在行動:

import com.example.spark.implicits._ 

val rdd = sc.parallelize(Seq(("grape", 0.3), ("apple", 5.0), ("orange", 5.6))) 
rdd.sortByKey(fruitOrdering).collect 
// Array[(String, Double)] = Array((orange,5.6), (apple,5.0), (grape,0.3)) 

rdd.sortByKey.collect // Natural ordering by default 
// Array[(String, Double)] = Array((apple,5.0), (grape,0.3), (orange,5.6)) 

rdd.sortWith(_._2 compare _._2).collect // sort by the value instead 
// Array[(String, Double)] = Array((grape,0.3), (apple,5.0), (orange,5.6)) 
+0

真棒回答。謝謝 – user1660256

1

我不知道的火花,而是純粹的Scala集合,這將是

_.sortBy(_.fruitType) 

例如,

val l: List[String] = List("the", "big", "bang") 
val sortedByFirstLetter = l.sortBy(_.head) 
// List(big, bang, the) 
+0

感謝您的快速回復,但我不太明白。這是如何讓我指定我想先來哪一塊水果,等等? – user1660256

+0

對不起,我不小心編寫了'groupBy'而不是'sortBy'。更新了答案。 – VasyaNovikov

1

中有火花sortBy方法,它允許你定義一個任意排序以及是否需要升序或降序。例如。

scala> val rdd = sc.parallelize(Seq (("a", 1), ("z", 7), ("p", 3), ("a", 13) )) 
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[331] at parallelize at <console>:70 

scala> rdd.sortBy(_._2, ascending = false) .collect.mkString("\n") 
res34: String = 
(a,13) 
(z,7) 
(p,3) 
(a,1) 

scala> rdd.sortBy(_._1, ascending = false) .collect.mkString("\n") 
res35: String = 
(z,7) 
(p,3) 
(a,1) 
(a,13) 

scala> rdd.sortBy 

def sortBy[K](f: T => K, ascending: Boolean, numPartitions: Int)(implicit ord: scala.math.Ordering[K], ctag: scala.reflect.ClassTag[K]): RDD[T] 

最後一部分告訴你sortBy的簽名是什麼。在前面的例子中使用的排序是由該對的第一部分和第二部分組成。

編輯:回答得太快,沒有檢查你的問題,對不起......總之,你會定義排序就像在你的例子:

def myord(fruit:String) = fruit match { 
    case "oranges" => 1 ; 
    case "apples" => 2; 
    case "grapes" =>3; 
    case "pears" => 4; 
    case _ => 5} 

val rdd = sc.parallelize(Seq("apples", "oranges" , "pears", "grapes" , "other")) 

然後,排序的結果將是:

scala> rdd.sortBy[Int](myord, ascending = true).collect.mkString("\n") 
res1: String = 
oranges 
apples 
grapes 
pears 
other 
+0

這看起來非常接近我想要的。但是,而不是一個序列,我有一個水果地圖(「蘋果」 - >「好」,「橙子」 - >「更好」,「梨」 - >「偉大」)。那麼,我將如何排序k,v中的k? – user1660256

2

如果你能描述的順序的唯一方法是枚舉然後簡單地列舉:

val order = Map("orange" -> 0L, "apple" -> 1L, "grape" -> 2L, "pear" -> 3L) 
val rdd = sc.parallelize(Seq(("grape", 0.3), ("apple", 5.0), ("orange", 5.6))) 
val sorted = rdd.sortBy{case (key, _) => order.getOrElse(key, Long.MaxValue)} 
sorted.collect 

// Array[(String, Double)] = Array((orange,5.6), (apple,5.0), (grape,0.3)) 
+0

是的,我在想那樣的事情!但我正在使用鍵 - >值對。我將在哪裏添加枚舉器? – user1660256

+0

如果它會被重用,你應該考慮創建一個廣播變量,否則上面提供的代碼應該工作得很好。在閉包中引用的任何變量都會自動序列化並傳輸給所有工作人員。 – zero323