2016-03-09 57 views
1

我有一個Array[String],其中包含某些文件的路徑。 該數組是自動生成的,因此不能保證文件存在。Spark:讀取多個文件並忽略丟失的文件

我想讀取所有這些路徑,將現有的路徑加載到RDD中,並忽略不存在的路徑。

我試圖做到以下幾點:

import scala.util.Try 

val arrayOfFilePaths: Array[String] = ["path1", "path2", "path3", "path4"] 
val allRecords = sc.union(arrayOfFilePaths.map(p => Try(sc.textFile(p))).filter(_.isSuccess).map(_.get)) 

但是它看起來沒有成功避免了不存在的文件,我收到以下錯誤,當我嘗試allRecords.collect()

Input path does not exist: file:/path/to/unexistingFile 
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285) 

這裏有什麼不對的提示?

+1

這種過濾不工作的原因是火花的懶惰:'sc.textFile(P)'是懶惰 - 它返回一個RDD沒有實際閱讀file(yet),所以'Try(sc.textFile(p))'返回一個'Success'對象,而不管文件是否存在,並且沒有任何東西被過濾掉。然後,當你調用'collect'時 - 這就是讀數實際發生的時間,並且過濾器捕捉故障爲時已晚。你的回答確實是正確的路要走。 –

+0

很好的解釋@TzachZohar,謝謝! – Rami

回答

3

好的,我已經想出了一個解決方案。

我已經在加載文件之前過濾了數組。

import java.nio.file.{Paths, Files} 

val filteredPaths = arrayOfFilePaths.filter(p => Files.exists(Paths.get(p))).mkString(",") 

然後我就可以加載這些文件

val allRecords = sc.textFile(filteredPaths)