val rdd = sc.makeRDD(Seq("paul", "jim", "joe", "mary", "sean", "peter", "lucy"))
val startIndex = 1
val endIndex = 5
val shortRdd=rdd.zipWithIndex().filter { case (_, idx) => idx >= startIndex && idx <= endIndex }.map(p=>p._1)
shortRdd.count
shortRdd.foreach(println)
第一步:讓我們看看最新的RDD內:
rdd.foreach(println)
peter
lucy
jim
joe
paul
mary
sean
步驟2:應用一個變換到附加索引,注意的指標值現在被應用到每一行。
rdd.zipWithIndex().foreach(println)
(peter,5)
(jim,1)
(joe,2)
(paul,0)
(mary,3)
(sean,4)
(lucy,6)
第三步:在索引位置應用過濾器,拉開始和結束索引位置
rdd.zipWithIndex().filter { case (_, idx) => idx >= startIndex && idx <= endIndex }.foreach(println)
(mary,3)
(sean,4)
(jim,1)
(peter,5)
(joe,2)
第四步之間指標:映射回單個元素中的每一行
rdd.zipWithIndex().filter { case (_, idx) => idx >= startIndex && idx <= endIndex }.map(p=>p._1).foreach(println)
mary
jim
joe
peter
sean
我執行RDD上的這個過程與100k或更多的行沒有任何問題。讓我知道這是如何執行更大的RDD。
那就是它!保羅。
您將如何選擇RDD的哪一部分應該複製到較小的RDD? – Yaron
請擴展您的用例。是的,某些行可能會導致問題,但是您真的可以通過索引或內容來識別這些行嗎? –