2017-07-14 156 views
0

我使用的火花與sc.textFile(fileLocation),需要能夠快速下降的第一和最後一行閱讀的文本文件(他們可能是一個首或尾)。我發現的好方法返回第一和最後一排,但沒有很好的一個刪除它們。這可能嗎?刪除的RDD的第一個和最後一行星火

+0

如果他們按照一定的模式,您可以使用過濾器。 – jamborta

+0

他們不這樣做,這是該工具的目的... – bendl

回答

3

這樣做將是zipWithIndex,然後用指標篩選出的記錄的一種方式0count - 1

// We're going to perform multiple actions on this RDD, 
// so it's usually better to cache it so we don't read the file twice 
rdd.cache() 

// Unfortunately, we have to count() to be able to identify the last index 
val count = rdd.count() 
val result = rdd.zipWithIndex().collect { 
    case (v, index) if index != 0 && index != count - 1 => v 
} 

注意,這可能是在性能方面相當昂貴的(如果緩存RDD - 你使用內存;如果你不使用,你可以讀RDD兩次)。所以,如果你有一個基於其內容(例如,如果你知道所有的記錄,但這些應包含一定的模式)的識別這些記錄的任何方式,使用filter可能會更快。

+0

我正與一個可能的解決方案,以更新的問題:我用'拿(data.count - 1)'然後篩選其中'行! = data.first'。對於封面下的火花是如何工作的相當無知,哪種解決方案會更快?這些數據將以任何方式存儲在內存中。 – bendl

+2

有一個非常顯着的區別 - 'take(data.count - 1)'會將整個RDD收集到_driver_內存(一臺機器!),對於大型RDD,這會導致OOM;另一方面,緩存保持RDD _distributed_並將其_partitions_加載到工作節點的內存中(其中有很多潛在的) - 所以你不太可能獲得OOM並且任何一種方式都會更快(數據將不必傳輸給驅動程序)。你只能用'take'來收集相對較少的記錄。 –

2

這可能是一個更輕的版本:

val rdd = sc.parallelize(Array(1,2,3,4,5,6), 3) 
val partitions = rdd.getNumPartitions 
val rddFirstLast = rdd.mapPartitionsWithIndex { (idx, iter) => 
    if (idx == 0) iter.drop(1) 
    else if (idx == partitions - 1) iter.sliding(2).map(_.head) 
    else iter 
} 

scala> rddFirstLast.collect() 
res3: Array[Int] = Array(2, 3, 4, 5) 
+0

打火機如何?內存還是計算? – bendl

+0

兩者。您的版本將收集數據(內存)並比較每一行(計算)。這一個保持數據分佈並依賴於RDD的外部順序。所以沒有進行比較和數據保持分佈。 – jamborta

+0

我明白了。在這裏不要開始一場火焰戰爭,但是在你的回答和@Tzach Zhohar的 – bendl

相關問題