我有這個簡單的火花程序。我想知道爲什麼所有數據都在一個分區中結束。Spark - 所有數據在reduceByKey後都會在一個分區中結束
val l = List((30002,30000), (50006,50000), (80006,80000),
(4,0), (60012,60000), (70006,70000),
(40006,40000), (30012,30000), (30000,30000),
(60018,60000), (30020,30000), (20010,20000),
(20014,20000), (90008,90000), (14,0), (90012,90000),
(50010,50000), (100008,100000), (80012,80000),
(20000,20000), (30010,30000), (20012,20000),
(90016,90000), (18,0), (12,0), (70016,70000),
(20,0), (80020,80000), (100016,100000), (70014,70000),
(60002,60000), (40000,40000), (60006,60000),
(80000,80000), (50008,50000), (60008,60000),
(10002,10000), (30014,30000), (70002,70000),
(40010,40000), (100010,100000), (40002,40000),
(20004,20000),
(10018,10000), (50018,50000), (70004,70000),
(90004,90000), (100004,100000), (20016,20000))
val l_rdd = sc.parallelize(l, 2)
// print each item and index of the partition it belongs to
l_rdd.mapPartitionsWithIndex((index, iter) => {
iter.toList.map(x => (index, x)).iterator
}).collect.foreach(println)
// reduce on the second element of the list.
// alternatively you can use aggregateByKey
val l_reduced = l_rdd.map(x => {
(x._2, List(x._1))
}).reduceByKey((a, b) => {b ::: a})
// print the reduced results along with its partition index
l_reduced.mapPartitionsWithIndex((index, iter) => {
iter.toList.map(x => (index, x._1, x._2.size)).iterator
}).collect.foreach(println)
當你運行它,你將看到數據(l_rdd
)分配到兩個分區。一旦我減少了,結果RDD(l_reduced
)也有兩個分區,但所有的數據都在一個分區(索引0),另一個是空的。即使數據量很大(幾GB),也會發生這種情況。 l_reduced
不應該也分佈到兩個分區中。
我正在使用Spark 1.6.1,並沒有改變ShuffleManager。