2017-02-06 55 views
3

我有這個簡單的火花程序。我想知道爲什麼所有數據都在一個分區中結束。Spark - 所有數據在reduceByKey後都會在一個分區中結束

val l = List((30002,30000), (50006,50000), (80006,80000), 
      (4,0), (60012,60000), (70006,70000), 
      (40006,40000), (30012,30000), (30000,30000), 
      (60018,60000), (30020,30000), (20010,20000), 
      (20014,20000), (90008,90000), (14,0), (90012,90000), 
      (50010,50000), (100008,100000), (80012,80000), 
      (20000,20000), (30010,30000), (20012,20000), 
      (90016,90000), (18,0), (12,0), (70016,70000), 
      (20,0), (80020,80000), (100016,100000), (70014,70000), 
      (60002,60000), (40000,40000), (60006,60000), 
      (80000,80000), (50008,50000), (60008,60000), 
      (10002,10000), (30014,30000), (70002,70000), 
      (40010,40000), (100010,100000), (40002,40000), 
      (20004,20000), 
      (10018,10000), (50018,50000), (70004,70000), 
      (90004,90000), (100004,100000), (20016,20000)) 

val l_rdd = sc.parallelize(l, 2) 

// print each item and index of the partition it belongs to 
l_rdd.mapPartitionsWithIndex((index, iter) => { 
    iter.toList.map(x => (index, x)).iterator 
}).collect.foreach(println) 

// reduce on the second element of the list. 
// alternatively you can use aggregateByKey 
val l_reduced = l_rdd.map(x => { 
        (x._2, List(x._1)) 
        }).reduceByKey((a, b) => {b ::: a}) 

// print the reduced results along with its partition index 
l_reduced.mapPartitionsWithIndex((index, iter) => { 
     iter.toList.map(x => (index, x._1, x._2.size)).iterator 
}).collect.foreach(println) 

當你運行它,你將看到數據(l_rdd)分配到兩個分區。一旦我減少了,結果RDD(l_reduced)也有兩個分區,但所有的數據都在一個分區(索引0),另一個是空的。即使數據量很大(幾GB),也會發生這種情況。 l_reduced不應該也分佈到兩個分區中。

我正在使用Spark 1.6.1,並沒有改變ShuffleManager。

回答

2
val l_reduced = l_rdd.map(x => { 
        (x._2, List(x._1)) 
        }).reduceByKey((a, b) => {b ::: a}) 

參照上面的代碼片段,您正在通過RDD的第二個字段進行分區。在第二場的所有數字以0

結束當你調用HashPartitioner,用於記錄分區號由以下function決定:

def getPartition(key: Any): Int = key match { 
    case null => 0 
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) 
    } 

而且Utils.nonNegativeMod被定義爲follows

def nonNegativeMod(x: Int, mod: Int): Int = { 
    val rawMod = x % mod 
    rawMod + (if (rawMod < 0) mod else 0) 
    } 

讓我們看看會發生什麼,當我們邏輯的上述兩件適用於您的輸入:

scala> l.map(_._2.hashCode % 2) // numPartitions = 2 
res10: List[Int] = List(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) 

因此,所有的記錄分區最終0

您可以通過重新分區解決這個問題:

val l_reduced = l_rdd.map(x => { 
        (x._2, List(x._1)) 
        }).reduceByKey((a, b) => {b ::: a}).repartition(2) 

這給:

(0,100000,4) 
(0,10000,2) 
(0,0,5) 
(0,20000,6) 
(0,60000,5) 
(0,80000,4) 
(1,50000,4) 
(1,30000,6) 
(1,90000,4) 
(1,70000,5) 
(1,40000,4) 

或者,您可以創建一個custom partitioner

0

除非另有指定,否則將根據相關密鑰的哈希碼完成分區,假設哈希碼將導致相對均勻的分佈。在這種情況下,您的哈希碼都是均勻的,因此將全部進入分區0.

如果這確實代表了您的數據集,那麼reduceByKey會承受分區器和reduce函數的負載。我會建議爲像這樣的數據集提供一個替代分區算法。

相關問題