假設您的收藏類似於List
的東西,你可以這樣做:
val rdd2 = rdd1.flatMap { case (key, values) =>
for (value <- values.sliding(2).zipWithIndex)
yield (key.toString + value._2, value._1)
}
我們這裏做的是通過你的列表中的值迭代,對應用大小爲2的滑動窗口元素,用整數索引壓縮元素,最後輸出由附加了列表索引(其值爲滑動元素)的原始索引鍵入的元組列表。我們也在這裏使用flatMap來將結果平鋪到自己的記錄中。
當火花shell中運行,我看到你的例子下面的輸出:
scala> val rdd1 = sc.parallelize(Array((1,List("A","B","C","D")), (2,List("E","F","G"))))
rdd1: org.apache.spark.rdd.RDD[(Int, List[String])] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> val rdd2 = rdd1.flatMap { case (key, values) => for (value <- values.sliding(2).zipWithIndex) yield (key.toString + value._2, value._1) }
rdd2: org.apache.spark.rdd.RDD[(String, Seq[String])] = MapPartitionsRDD[1] at flatMap at <console>:23
scala> rdd2.foreach(println)
...
(10,List(A, B))
(11,List(B, C))
(12,List(C, D))
(20,List(E, F))
(21,List(F, G))
的一個注意到這是輸出鍵(如10
,11
)將有3位,如果你有11個或更多的元素。例如,對於輸入密鑰1
,您將在第11個元素上具有輸出密鑰110
。不確定這是否適合您的用例,但它似乎是您的請求的合理擴展。根據你的輸出密鑰方案,我實際上會建議一些不同的東西(比如可能在密鑰和元素之間添加連字符?)。這會稍後阻止碰撞,因爲對於這兩個密鑰,您將看到2-10
和21-0
而不是210
。
我明白了。非常感謝。您完美解決我的問題 –
@ SN.JS請將問題標記爲已回答(點擊此答案左側的複選標記)。 –