我正在寫一個Spark/Scala程序來讀取ZIP文件,將它們解壓縮並將內容寫入一組新文件。我可以將其寫入本地文件系統,但是想知道是否有辦法將輸出文件寫入分佈式文件系統(如HDFS)。代碼顯示below`在Spark/Scala中寫入HDFS
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._
var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file: (String, PortableDataStream)) =>
{
val zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
val iter = scala.io.Source.fromInputStream(zipStream).getLines
val fname = f"/d/tmp/myfile$i.txt"
i = i + 1
val xx = iter.mkString
val writer = new PrintWriter(new File(fname))
writer.write(xx)
writer.close()
iter
}).collect()
`
在該代碼中iter不是RDD,所以不能寫它。可能首先進行轉換。 – dumitru
是的,我認爲我們會在這裏很好。 RDD應該是數據類型來操縱火花以便在集羣上獲得分佈式數據。 – chateaur
這就是問題的癥結所在。我已經嘗試了所有我能想到的將我的數據傳遞給RDD以啓用saveasTextFile的使用,但是結果很短。如果有人已經解決了這個問題,請讓我知道 – user2699504