我有一個RDD數組:Array[RDD[(String, Double)]]
,如何合併這些RDD到RDD[String, Array[Double]]
。例如:如何合併RDD數組
RDD Array: [[('x', 1), ('y', 2)], [('x', 3), ('y', 4)],...] =>
RDD: [('x', [1, 3,...]), ('y', [2, 4, ...])]
任何幫助表示讚賞!由於
我有一個RDD數組:Array[RDD[(String, Double)]]
,如何合併這些RDD到RDD[String, Array[Double]]
。例如:如何合併RDD數組
RDD Array: [[('x', 1), ('y', 2)], [('x', 3), ('y', 4)],...] =>
RDD: [('x', [1, 3,...]), ('y', [2, 4, ...])]
任何幫助表示讚賞!由於
val mergeIntoOne: RDD[(String, Double)] = array.fold(sparkSession.sparkContext.emptyRDD[(String, Double)])(_ ++ _) val groupByKeys: RDD[(String, Iterable[Double])] = mergeIntoOne.groupByKey() val sortedValues = groupByKeys.mapValues(_.toList.sorted)
謝謝!分組(第2行)後,'groupByKeys'數組中的元素順序與原始數組順序相同嗎?或在摺疊期間洗牌? – hlltc
我想排序是唯一可以肯定的方法。訂單可能會有所不同,因爲由Spark重新分區的數據 –
假設你沒有在每個RDD重複鍵,那麼你可以在所有RDDS嘗試foldLeft在Array[RDD]
與fullOuterJoin
:
val rdd1 = sc.parallelize(Seq(("x", 1.0), ("y", 2.0)))
val rdd2 = sc.parallelize(Seq(("x", 3.0), ("y", 4.0)))
val rdd3 = sc.parallelize(Seq(("x", 5.0), ("y", 6.0)))
val rdds = Array(rdd1, rdd2, rdd3)
val startRdd = sc.parallelize(Seq[(String, Seq[Option[Double]])]())
(rdds.foldLeft(startRdd)(
(rdd1, rdd2) => rdd1.fullOuterJoin(rdd2).mapValues(
p => p._1.getOrElse(Seq[Option[Double]]()) :+ p._2
)
).mapValues(_.collect{ case Some(x) => x }).collect)
// res15: Array[(String, Seq[Double])] = Array((x,List(1.0, 3.0, 5.0)), (y,List(2.0, 4.0, 6.0)))
這取決於您要使用它,但你可以使用一個for循環和工會陣列
scala> var a = Array(("a1",1.1))
a: Array[(String, Double)] = Array((a1,1.1))
scala> var b = Array(("a2",1.2))
b: Array[(String, Double)] = Array((a2,1.2))
scala> for (i <- 0 to b.length) {
| a = a:+b(i)}
scala> a
res2: Array[(String, Double)] = Array((a1,1.1), (a2,1.2))
我在這裏沒有看到任何RDD。你有沒有注意到他問過關於Apache Spark的問題? –
你是怎麼RDD的陣列?我想它應該是Array的RDD,請確認。 –