2017-06-19 26 views
0

我節省在文本文件中每個事件如下:Spark:如何添加每個RDD的大小並寫入新文件?

map{ case (_, record) => getEventFromRecord(record) }.map(m => m.toByteArray).saveAsTextFile(outputPath) 

我也想保存我保存到文本文件中的每個事件的總規模。 1)如何將每條記錄的總大小保存到新文件? 2)我嘗試使用累加器

val accum = sparkContext.accumulator(0, "My Accumulator") 
map{ case (_, record) => getEventFromRecord(record) }.foreach(m => accum += (m.toByteArray.length)).saveAsTextFile(outputPath) 

但我得到以下錯誤:

  value saveAsTextFile is not a member of Unit 

[error]  sparkContext.sequenceFile(inputDirectory, classOf[IntWritable], classOf[DataOutputValue]).map{ case (_, record) => getEventFromRecord(record) }.foreach(m => accum += (m.toByteArray.length)).saveAsTextFile(outputPath) 
[error]                                                   ^
[error] one error found 
[error] (compile:compileIncremental) Compilation failed 

回答

1

在foreach操作返回單元作爲結果和只用於副作用。如果你想收集您的RDD的總和,使用reduce行動

val totalSize = map{ case (_, record) => getEventFromRecord(record).toByteArray.length}.reduce{_ + _} 

這將返回驅動程序上求和的結果。然後,您可以使用Hadoop文件系統API創建一個新文件並寫入它。

val fs = FileSystem.get(new Configuration()) 
val outputWriter = new PrintWriter(fs.create(outputPath)) 
outputWriter.println(totalSize) 
outputWriter.flush() 
outputWriter.close() 

注意在生產,你可能會想給的OutputStream包裝到一個try/catch/finally塊或相似,以確保您的資源接近正常與IO你做的任何文件。

+0

謝謝!但我不認爲上面的代碼有效。我想保存輸出文件中所有記錄的TOTAL長度。但上面的代碼給我多個對象這樣'[B @ 157ac242 [B @ 343b877f [B @ 2a8481f5 [B @ 12a57ac4 [B @ 518ed200' – NoName

+0

@Noname,對不起,我誤解你的問題。請看我更新的答案。 – puhlen

+0

謝謝你的回答!有什麼辦法可以使這個RDD和使用'saveAsTextFile'寫入文本文件? – NoName

相關問題