2015-10-26 48 views
0

我是Spark的新生。我有一個問題,但我不知道如何解決它。我在RDD數據如下:如何根據特定邏輯從另一個RDD生成新的RDD

(1,{A,B,C,D}) 
(2,{E,F,G}) 
...... 

我知道RDDS是不可改變的,但是,我想我的RDD轉變成一個新的RDD,看起來像這樣:

11 A,B 
12 B,C 
13 C,D 
21 E,F 
22 F,G 
...... 

我怎麼能生成新的密鑰和提取相鄰的元素?

回答

1

假設您的收藏類似於List的東西,你可以這樣做:

val rdd2 = rdd1.flatMap { case (key, values) => 
    for (value <- values.sliding(2).zipWithIndex) 
    yield (key.toString + value._2, value._1) 
} 

我們這裏做的是通過你的列表中的值迭代,對應用大小爲2的滑動窗口元素,用整數索引壓縮元素,最後輸出由附加了列表索引(其值爲滑動元素)的原始索引鍵入的元組列表。我們也在這裏使用flatMap來將結果平鋪到自己的記錄中。

當火花shell中運行,我看到你的例子下面的輸出:

scala> val rdd1 = sc.parallelize(Array((1,List("A","B","C","D")), (2,List("E","F","G")))) 
rdd1: org.apache.spark.rdd.RDD[(Int, List[String])] = ParallelCollectionRDD[0] at parallelize at <console>:21 

scala> val rdd2 = rdd1.flatMap { case (key, values) => for (value <- values.sliding(2).zipWithIndex) yield (key.toString + value._2, value._1) } 
rdd2: org.apache.spark.rdd.RDD[(String, Seq[String])] = MapPartitionsRDD[1] at flatMap at <console>:23 

scala> rdd2.foreach(println) 
... 
(10,List(A, B)) 
(11,List(B, C)) 
(12,List(C, D)) 
(20,List(E, F)) 
(21,List(F, G)) 

的一個注意到這是輸出鍵(如1011)將有3位,如果你有11個或更多的元素。例如,對於輸入密鑰1,您將在第11個元素上具有輸出密鑰110。不確定這是否適合您的用例,但它似乎是您的請求的合理擴展。根據你的輸出密鑰方案,我實際上會建議一些不同的東西(比如可能在密鑰和元素之間添加連字符?)。這會稍後阻止碰撞,因爲對於這兩個密鑰,您將看到2-1021-0而不是210

+0

我明白了。非常感謝。您完美解決我的問題 –

+0

@ SN.JS請將問題標記爲已回答(點擊此答案左側的複選標記)。 –