2014-04-17 12 views
19

我是Spark新手。我有一個很大的元素數據集[RDD],我想把它分成兩個完全相同大小的分區,維護元素的順序。我試着用RangePartitioner如何爲每個分區具有相同數量的元素的相同大小的分區的Spark RDD定義自定義分區程序?

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile)) 

這不給一個滿意的結果,因爲它大致劃分,但並不完全一樣大小的維護元素的順序。例如,如果有64個元素,我們使用 Rangepartitioner,那麼它分爲31個元素和33個元素。

我需要一個分區器,這樣我可以得到第一個32個元素中的一半,另一半包含第二個32個元素。 您能否通過建議如何使用定製的分區程序來幫助我,使得我可以獲得相同的大小兩半,維護元素的順序?

+0

嗨!你在哪裏調用partitionBy,我在火花文檔中找不到這個方法。在定義一個新的分區器之後,我如何將現有的RDD分區到一組新的分區中? 謝謝! –

+0

'partitionBy'在[PairRDDFunctions](http://spark.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions)中,因此您可以在任何情況下調用它'RDD [K,V]'。有很多隱藏在這個課堂中的基本方法,請查看! –

+0

謝謝丹尼爾!將檢查確定。 –

回答

16

Partitioner s通過爲分區分配密鑰來工作。您需要事先知道密鑰分配,或者查看所有密鑰,才能製作出這樣的分區器。這就是爲什麼Spark不能爲您提供一個。

一般來說,你不需要這樣的分區器。事實上,我不能拿出一個用例,我需要同等大小的分區。如果元素的數量是奇數呢?

無論如何,讓我們假設你有一個RDD,按序號Int s鍵入,你知道總共有多少個RDD。然後,你可以寫一個自定義Partitioner這樣的:

class ExactPartitioner[V](
    partitions: Int, 
    elements: Int) 
    extends Partitioner { 

    def getPartition(key: Any): Int = { 
    val k = key.asInstanceOf[Int] 
    // `k` is assumed to go continuously from 0 to elements-1. 
    return k * partitions/elements 
    } 
} 
+0

感謝Daniel.It的迴應。我工作的是一個算法,它在數據集中包含了很多元素。 – yh18190

+4

一旦你定義了這個新的類,你用它來調用它? RDD中的分區程序是一個val,我無法更改它,如果我使用此自定義分區程序定義新的RDD,如何使用方法創建它? –

+1

傾斜可能會引入額外的處理時間。它是通過讓一個執行器比另一個執行器掛起更多任務而引入的,或者因爲分區大小不一(一個任務運行時間比另一個長)。我通常會說超出預定的時間(遠遠超過可用的內核)和更小的分區,所以偏移消失在噪聲中。比試圖完全匹配單個執行器上的任務更好。 – YoYo

10

這個答案有從丹尼爾一些啓發,但(使用皮條客我的圖書館模式)用一個例子爲人民複製和粘貼需求提供全面落實:)

import RDDConversions._ 

trait RDDWrapper[T] { 
    def rdd: RDD[T] 
} 

// TODO View bounds are deprecated, should use context bounds 
// Might need to change ClassManifest for ClassTag in spark 1.0.0 
case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
    rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] { 
    // Here we use a single Long to try to ensure the sort is balanced, 
    // but for really large dataset, we may want to consider 
    // using a tuple of many Longs or even a GUID 
    def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] = 
    rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey() 
    .grouped(numPartitions).map(t => (t._1._1, t._2)) 
} 

case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] { 
    def grouped(size: Int): RDD[T] = { 
    // TODO Version where withIndex is cached 
    val withIndex = rdd.mapPartitions(_.zipWithIndex) 

    val startValues = 
     withIndex.mapPartitionsWithIndex((i, iter) => 
     Iterator((i, iter.toIterable.last))).toArray().toList 
     .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L) 

    withIndex.mapPartitionsWithIndex((i, iter) => iter.map { 
     case (value, index) => (startValues(i) + index.toLong, value) 
    }) 
    .partitionBy(new Partitioner { 
     def numPartitions: Int = size 
     def getPartition(key: Any): Int = 
     (key.asInstanceOf[Long] * numPartitions.toLong/startValues.last).toInt 
    }) 
    .map(_._2) 
    } 
} 

然後在另一個文件中,我們有

// TODO modify above to be implicit class, rather than have implicit conversions 
object RDDConversions { 
    implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] = 
    new RichRDD[T](rdd) 
    implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
    rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd) 
    implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd 
} 

那麼對於你只是想你的使用情況(假定它已經排序)

import RDDConversions._ 

yourRdd.grouped(2) 

免責聲明:未經測試,還挺剛纔寫的直入SO回答

+0

這個「partitionBy」方法在哪裏?我只能在JavaRDD中看到它,而不是在Scala RDD中。更新:OK在PairRDDFunctions中找到它(包含implicits) – javadba

相關問題