這個答案有從丹尼爾一些啓發,但(使用皮條客我的圖書館模式)用一個例子爲人民複製和粘貼需求提供全面落實:)
import RDDConversions._
trait RDDWrapper[T] {
def rdd: RDD[T]
}
// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
// Here we use a single Long to try to ensure the sort is balanced,
// but for really large dataset, we may want to consider
// using a tuple of many Longs or even a GUID
def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
.grouped(numPartitions).map(t => (t._1._1, t._2))
}
case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
def grouped(size: Int): RDD[T] = {
// TODO Version where withIndex is cached
val withIndex = rdd.mapPartitions(_.zipWithIndex)
val startValues =
withIndex.mapPartitionsWithIndex((i, iter) =>
Iterator((i, iter.toIterable.last))).toArray().toList
.sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)
withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
case (value, index) => (startValues(i) + index.toLong, value)
})
.partitionBy(new Partitioner {
def numPartitions: Int = size
def getPartition(key: Any): Int =
(key.asInstanceOf[Long] * numPartitions.toLong/startValues.last).toInt
})
.map(_._2)
}
}
然後在另一個文件中,我們有
// TODO modify above to be implicit class, rather than have implicit conversions
object RDDConversions {
implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] =
new RichRDD[T](rdd)
implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd)
implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}
那麼對於你只是想你的使用情況(假定它已經排序)
import RDDConversions._
yourRdd.grouped(2)
免責聲明:未經測試,還挺剛纔寫的直入SO回答
嗨!你在哪裏調用partitionBy,我在火花文檔中找不到這個方法。在定義一個新的分區器之後,我如何將現有的RDD分區到一組新的分區中? 謝謝! –
'partitionBy'在[PairRDDFunctions](http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions)中,因此您可以在任何情況下調用它'RDD [K,V]'。有很多隱藏在這個課堂中的基本方法,請查看! –
謝謝丹尼爾!將檢查確定。 –