2016-02-29 55 views
0

在Spark中,我有一個RDD,其中包含數百萬到本地文件的路徑(我們有一個共享的文件系統,因此它們在本地顯示)。在Scala中,我將如何創建一個由每個文件中所有行組成的RDD?在Spark中讀取數百萬本地文件

我試圖做這樣的事情:

paths.flatMap(path => sc.textFile(path)) 

但沒有奏效。我也試過這樣的:

paths.flatMap(path => 
    scala.io.Source.fromInputStream(new java.io.FileInputStream(path)).getLines 
) 

這工作時本地工作,但沒有在多臺機器上運行時。我結束了這個錯誤:

java.nio.charset.MalformedInputException: Input length = 1 
    at java.nio.charset.CoderResult.throwException(CoderResult.java:277) 

任何指針將不勝感激

(大多數解決方案點至今涉及通過文件的列表sc.textFile全部一次,這是不可能的,因爲名單可能非常大,現在一個典型的用例會產生20M的路徑,這不適合單個Java String)。

回答

2

如果他們在一個目錄,那麼它可能會更好看整個目錄

sc.textFile("/path/to/directory/") 

將合併所有文件到一個單一的RDD,尋找出MEM約束。或者你可以嘗試的地圖,則減少:

paths.map(path => sc.textFile(path)).reduce(_.union(_)) 

或者甚至zero323建議更好:

SparkContext.union(paths.map(...)) 
+0

@ zero323哦,我不知道,很高興知道,我認爲最大字符串長度是65535個字符,但是2^31-1大約是2個字符,假設每個路徑是20個字符,它應該是足夠的。 – GameOfThrows

+0

謝謝:)我可以用'SparkContext.union(paths.map(...))'來替換'.reduce(_。union(_))'嗎?這真的會產生__huge差異_。 (如果你這樣做,請從維基答案中刪除最後一段) – zero323

+0

@ zero323哇,我沒想到這會更快,但它確實有很大的不同,你介意簡單解釋一下爲什麼?這與數據混洗有關嗎? – GameOfThrows

2

此:

paths.flatMap(path => sc.textFile(path)) 

根本無法編譯何況工作,因爲RDDS不TraversableOnce

直接讀取文件時看到的錯誤(java.nio.charset.MalformedInputException)與Spark不相關,並在文件編碼不正確時引發。引述MalformedInputException documentation

Checked exception thrown when an input byte sequence is not legal for given charset, or an input character sequence is not a legal sixteen-bit Unicode sequence. You can solve it providing a codec for fromInputStream method:

def fromInputStream(is: InputStream)(implicit codec: Codec) 

並使用Codec.onMalformedInput具有所需CodingErrorAction

參見例如Source.fromInputStream exception handling during reading lines

此外,當您直接讀取數據時,例如通過用Try包裝讀取塊,您應該處理IO異常。

Spark本身支持讀取完整的目錄樹。沒有理由傳遞個別文件,只有頂級目錄列表並使用mapreduce.input.fileinputformat.input.dir.recursive配置的正確設置。也可以通過多根目錄作爲一個逗號分隔的字符串:

sc.textFile("/foo,/bar") 

您也可以使用通配符來讀取文件和目錄的任意列表:

sc.textFile(Seq(
    "/foo/bar/*", 
    "/bar/*/*.txt" 
).mkString(",")) 

最後閱讀大量的小由於計算輸入分割的方式,文件效率低下。而不是使用textFiles,你應該考慮與CombineFileInputFormat例如一個子類閱讀:

sc.hadoopFile(
    paths, 
    classOf[CombineTextInputFormat], 
    classOf[LongWritable], classOf[Text] 
).map(_._2.toString) 

最後,你可以通過@GameOfThrows建議,但它不應該在沒有檢查點to avoid issues with long lineages反覆做union多個RDDS。改爲使用SparkContext.union並控制分區數量。

相關問題