我正在嘗試按特定順序將多個RDD的字符串合併到RDD行中。我試圖創建一個Map[String, RDD[Seq[String]]]
(其中Seq
只包含一個元素),然後將它們合併到一個RDD[Row[String]]
,但它似乎不起作用(內容RDD[Seq[String]]
丟失)。有人有什麼想法嗎?按特定順序合併多個RDD
val t1: StructType
val mapFields: Map[String, RDD[Seq[String]]]
var ordRDD: RDD[Seq[String]] = context.emptyRDD
t1.foreach(field => ordRDD = ordRDD ++ mapFiels(field.name))
val rdd = ordRDD.map(line => Row.fromSeq(line))
編輯: 使用壓縮功能導致火花例外,因爲我的RDDS沒有相同數量的每個分區元素。我不知道如何確保它們在每個分區中都具有相同數量的元素,因此我只是使用索引對它們進行壓縮,然後使用ListMap
以良好順序加入它們。也許有一個關於mapPartitions
函數的技巧,但我還不夠了解Spark API。
val mapFields: Map[String, RDD[String]]
var ord: ListMap[String, RDD[String]] = ListMap()
t1.foreach(field => ord = ord ++ Map(field.name -> mapFields(field.name)))
// Note : zip = SparkException: Can only zip RDDs with same number of elements in each partition
//val rdd: RDD[Row] = ord.toSeq.map(_._2.map(s => Seq(s))).reduceLeft((rdd1, rdd2) => rdd1.zip(rdd2).map{ case (l1, l2) => l1 ++ l2 }).map(Row.fromSeq)
val zipRdd = ord.toSeq.map(_._2.map(s => Seq(s)).zipWithIndex().map{ case (d, i) => (i, d) })
val concatRdd = zipRdd.reduceLeft((rdd1, rdd2) => rdd1.join(rdd2).map{ case (i, (l1, l2)) => (i, l1 ++ l2)})
val rowRdd: RDD[Row] = concatRdd.map{ case (i, d) => Row.fromSeq(d) }
val df1 = spark.createDataFrame(rowRdd, t1)
你是什麼意思是「合併」,你的意思是每個RDD將「貢獻」一個_column_的結果?如果是這樣 - 如果不是所有的RDD都具有相同的尺寸,會發生什麼? –
是的,每個「RDD」將成爲一列。 RDD應該具有相同的大小。我認爲沒有必要考慮這種情況。 – belgacea