2017-03-27 67 views
0

我指的是網絡鏈接http://codingjunkie.net/spark-secondary-sort/ 在我的火花招聘中實施二次排序。輔助排序使用Apache Spark 1.6

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.Partitioner 

package Ssort { 
case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) { 
     implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = { 
     Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1)) 
    } 
} 

class DeviceKeyPartitioner(partitions: Int) extends Partitioner { 
    require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") 

    override def numPartitions: Int = partitions 

    override def getPartition(key: Any): Int = { 
     val k = key.asInstanceOf[DeviceKey] 
     k.serialNum.hashCode() % numPartitions 
    } 
} 

object SparkApp { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Spark Application").setMaster("local[2]") 
    val sc = new SparkContext(conf)  
    val t = sc.parallelize(List(((DeviceKey("2","100",1),1)),(DeviceKey("2","100",3),1)), 1) 
    t.repartitionAndSortWithinPartitions(partitioner) 

    } 
} 
} 

我得到錯誤的 - 值repartitionAndSortWithinPartitions不org.apache.spark.rdd.RDD [(DeviceKey,智力)]的成員

有人可以看看嗎?

感謝&問候 帕裏

回答

0

此答案由張勇通過火花用戶的電子郵件組共享。

ANSWER

確實錯誤消息還不是很清楚。

你做錯了什麼是repartitionAndSortWithinPartitions不僅需要PairRDD,而且OrderedRDD。你的案例類作爲關鍵是沒有序。

您可以將它從Ordered擴展或提供伴隨對象來執行隱式Ordering。

scala> spark.version 
res1: String = 2.1.0 

scala> case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) 
extends Ordered[DeviceKey] { 
    | import scala.math.Ordered.orderingToOrdered 
    | def compare(that: DeviceKey): Int = 
    |  (this.serialNum, this.eventDate, this.EventTs * -1) compare 
    |  (that.serialNum, that.eventDate, that.EventTs * -1) 
    | } 
defined class DeviceKey 

scala> 

scala> val t = sc.parallelize(List(((DeviceKey("2","100",1),1)), 
(DeviceKey("2","100",3),1)), 1) 
t: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:26 

scala> 

scala> class DeviceKeyPartitioner(partitions: Int) extends org.apache.spark.Partitioner { 
    |  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") 
    | 
    |  override def numPartitions: Int = partitions 
    | 
    |  override def getPartition(key: Any): Int = { 
    |  val k = key.asInstanceOf[DeviceKey] 
    |  k.serialNum.hashCode() % numPartitions 
    |  } 
    | } 
defined class DeviceKeyPartitioner 

scala> 

scala> t.repartitionAndSortWithinPartitions(new DeviceKeyPartitioner(2)) 
res0: org.apache.spark.rdd.RDD[(DeviceKey, Int)] = ShuffledRDD[1] at repartitionAndSortWithinPartitions at <console>:30