2017-08-04 107 views
1

我有一個RDD數組:Array[RDD[(String, Double)]],如何合併這些RDD到RDD[String, Array[Double]]。例如:如何合併RDD數組

RDD Array: [[('x', 1), ('y', 2)], [('x', 3), ('y', 4)],...] => 
RDD: [('x', [1, 3,...]), ('y', [2, 4, ...])] 

任何幫助表示讚賞!由於

+0

你是怎麼RDD的陣列?我想它應該是Array的RDD,請確認。 –

回答

1
  1. 你應該RDDS的數組合併成一個RDD(1號線)
  2. 集團通過他們的字符串值(2號線)
  3. 我看到預期的輸出進行排序,如果需要,你可以值進行排序(3號線)

val mergeIntoOne: RDD[(String, Double)] = array.fold(sparkSession.sparkContext.emptyRDD[(String, Double)])(_ ++ _) val groupByKeys: RDD[(String, Iterable[Double])] = mergeIntoOne.groupByKey() val sortedValues = groupByKeys.mapValues(_.toList.sorted)

+0

謝謝!分組(第2行)後,'groupByKeys'數組中的元素順序與原始數組順序相同嗎?或在摺疊期間洗牌? – hlltc

+0

我想排序是唯一可以肯定的方法。訂單可能會有所不同,因爲由Spark重新分區的數據 –

0

假設你沒有在每個RDD重複鍵,那麼你可以在所有RDDS嘗試foldLeft在Array[RDD]fullOuterJoin

val rdd1 = sc.parallelize(Seq(("x", 1.0), ("y", 2.0))) 
val rdd2 = sc.parallelize(Seq(("x", 3.0), ("y", 4.0))) 
val rdd3 = sc.parallelize(Seq(("x", 5.0), ("y", 6.0))) 

val rdds = Array(rdd1, rdd2, rdd3) 

val startRdd = sc.parallelize(Seq[(String, Seq[Option[Double]])]()) 

(rdds.foldLeft(startRdd)(
    (rdd1, rdd2) => rdd1.fullOuterJoin(rdd2).mapValues(
     p => p._1.getOrElse(Seq[Option[Double]]()) :+ p._2 
    ) 
).mapValues(_.collect{ case Some(x) => x }).collect) 
// res15: Array[(String, Seq[Double])] = Array((x,List(1.0, 3.0, 5.0)), (y,List(2.0, 4.0, 6.0))) 
+0

非常感謝!這是真正的嵌套RDD不支持。現在問題實際上是:Array [RDD(String,Double)],如何將RDD數組合併到RDD(String,Array [Double])? – hlltc

+0

您是否可以用您試圖解決的實際問題更新您的問題? – Psidom

-1

這取決於您要使用它,但你可以使用一個for循環和工會陣列

scala> var a = Array(("a1",1.1)) 
a: Array[(String, Double)] = Array((a1,1.1)) 

scala> var b = Array(("a2",1.2)) 
b: Array[(String, Double)] = Array((a2,1.2)) 

scala> for (i <- 0 to b.length) { 
| a = a:+b(i)} 

scala> a 
res2: Array[(String, Double)] = Array((a1,1.1), (a2,1.2)) 
+0

我在這裏沒有看到任何RDD。你有沒有注意到他問過關於Apache Spark的問題? –