0
我指的是網絡鏈接http://codingjunkie.net/spark-secondary-sort/ 在我的火花招聘中實施二次排序。輔助排序使用Apache Spark 1.6
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.Partitioner
package Ssort {
case class DeviceKey(serialNum: String, eventDate: String, EventTs: Long) {
implicit def orderingBySerialNum[A <: DeviceKey] : Ordering[A] = {
Ordering.by(fk => (fk.serialNum, fk.eventDate, fk.EventTs * -1))
}
}
class DeviceKeyPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
override def numPartitions: Int = partitions
override def getPartition(key: Any): Int = {
val k = key.asInstanceOf[DeviceKey]
k.serialNum.hashCode() % numPartitions
}
}
object SparkApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Application").setMaster("local[2]")
val sc = new SparkContext(conf)
val t = sc.parallelize(List(((DeviceKey("2","100",1),1)),(DeviceKey("2","100",3),1)), 1)
t.repartitionAndSortWithinPartitions(partitioner)
}
}
}
我得到錯誤的 - 值repartitionAndSortWithinPartitions不org.apache.spark.rdd.RDD [(DeviceKey,智力)]的成員
有人可以看看嗎?
感謝&問候 帕裏