2016-09-07 45 views
0

我有以下RDD [字符串]:如何將範圍RDD的元素複製到一個更小的RDD

val rdd = sc.makeRDD(Seq("paul", "jim,", "joe", "mary", "sean", "peter", "lucy")) 

我想做什麼就能做的是能夠產生一個smallerRDD將上述主rdd中的一系列行復制到較小的rdd中。

使用案例: 在火花異常情況下,通過RDD旋轉時,RDD中的某些行/記錄可能會出現問題。

能夠以編程方式將一個複製到另一個使用有用的功能,因爲我無法找到一個罐裝rdd方法來做到這一點。 請參閱下面的解決方案。

+3

您將如何選擇RDD的哪一部分應該複製到較小的RDD? – Yaron

+1

請擴展您的用例。是的,某些行可能會導致問題,但是您真的可以通過索引或內容來識別這些行嗎? –

回答

0
val rdd = sc.makeRDD(Seq("paul", "jim", "joe", "mary", "sean", "peter", "lucy")) 

val startIndex = 1 
val endIndex = 5 
val shortRdd=rdd.zipWithIndex().filter { case (_, idx) => idx >= startIndex && idx <= endIndex }.map(p=>p._1) 
shortRdd.count 
shortRdd.foreach(println) 

第一步:讓我們看看最新的RDD內:

rdd.foreach(println) 
peter 
lucy 
jim 
joe 
paul 
mary 
sean 

步驟2:應用一個變換到附加索引,注意的指標值現在被應用到每一行。

rdd.zipWithIndex().foreach(println) 
(peter,5) 
(jim,1) 
(joe,2) 
(paul,0) 
(mary,3) 
(sean,4) 
(lucy,6) 

第三步:在索引位置應用過濾器,拉開始和結束索引位置

rdd.zipWithIndex().filter { case (_, idx) => idx >= startIndex && idx <= endIndex }.foreach(println) 
(mary,3) 
(sean,4) 
(jim,1) 
(peter,5) 
(joe,2) 

第四步之間指標:映射回單個元素中的每一行

rdd.zipWithIndex().filter { case (_, idx) => idx >= startIndex && idx <= endIndex }.map(p=>p._1).foreach(println) 
mary 
jim 
joe 
peter 
sean 

我執行RDD上的這個過程與100k或更多的行沒有任何問題。讓我知道這是如何執行更大的RDD。

那就是它!保羅。