2017-02-17 114 views
1

我正在寫一個Spark/Scala程序來讀取ZIP文件,將它們解壓縮並將內容寫入一組新文件。我可以將其寫入本地文件系統,但是想知道是否有辦法將輸出文件寫入分佈式文件系統(如HDFS)。代碼顯示below`在Spark/Scala中寫入HDFS

import java.util.zip.ZipInputStream 
import org.apache.spark.input.PortableDataStream 
import java.io._ 

var i =1 
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file: (String,  PortableDataStream)) => 
    { 


    val zipStream = new ZipInputStream(file._2.open)    
    val entry = zipStream.getNextEntry        
    val iter = scala.io.Source.fromInputStream(zipStream).getLines   

    val fname = f"/d/tmp/myfile$i.txt" 


    i = i + 1 

    val xx = iter.mkString 
    val writer = new PrintWriter(new File(fname)) 
    writer.write(xx) 
    writer.close() 

    iter              
    }).collect() 

`

回答

3

您可以使用Hadoop的公用庫(如果你正在使用SBT作爲依賴manangement工具,加thath庫到你的依賴)容易寫數據到HDFS。這樣,您可以創建一個文件系統對象:

private val fs = { 
    val conf = new Configuration() 
    FileSystem.get(conf) 
    } 

一定要與你的Hadoop集羣信息(核心site.xml的,等等)

然後,你可以寫配置文件系統,例如字符串路徑(在你的情況下,你應該處理數據流),在HDFS如下:

@throws[IOException] 
    def writeAsString(hdfsPath: String, content: String) { 
    val path: Path = new Path(hdfsPath) 
    if (fs.exists(path)) { 
     fs.delete(path, true) 
    } 
    val dataOutputStream: FSDataOutputStream = fs.create(path) 
    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8")) 
    bw.write(content) 
    bw.close 
    } 
0

你應該從官方文檔看看方法saveAsTextFile:http://spark.apache.org/docs/latest/programming-guide.html

它可以讓您保存到HDFS:

iter.saveAsTextFile("hdfs://...") 
+0

在該代碼中iter不是RDD,所以不能寫它。可能首先進行轉換。 – dumitru

+0

是的,我認爲我們會在這裏很好。 RDD應該是數據類型來操縱火花以便在集羣上獲得分佈式數據。 – chateaur

+0

這就是問題的癥結所在。我已經嘗試了所有我能想到的將我的數據傳遞給RDD以啓用saveasTextFile的使用,但是結果很短。如果有人已經解決了這個問題,請讓我知道 – user2699504

0

你可以試試saveAsTextFile方法。

將數據集的元素作爲文本文件(或文本文件集)寫入本地文件系統,HDFS或任何其他Hadoop支持的文件系統中的給定目錄中。 Spark將在每個元素上調用toString將其轉換爲文件中的一行文本。

它會將每個分區保存爲一個不同的文件。除非您重新分區或合併,否則最終將使用的分區數將與輸入文件數相同。

+0

請看我上面的評論,爲什麼使用saveasTextFile是一個問題 – user2699504

+0

不能你可以寫整個RDD而不是單獨的每個文件。而不是收集使用saveAsText文件? – NetanelRabinowitz

+0

將所有解壓縮數據連接成一個文件。這不是我想要的。我希望每個解壓縮文件都在它自己的單獨文件中 – user2699504

0
sc.binaryFiles("/user/example/zip_dir", 10)        //make an RDD from *.zip files in HDFS 
      .flatMap((file: (String, PortableDataStream)) => {     //flatmap to unzip each file 
       val zipStream = new ZipInputStream(file._2.open)    //open a java.util.zip.ZipInputStream 
       val entry = zipStream.getNextEntry        //get the first entry in the stream 
       val iter = Source.fromInputStream(zipStream).getLines   //place entry lines into an iterator 
       iter.next              //pop off the iterator's first line 
       iter               //return the iterator 
      }) 
      .saveAsTextFile("/user/example/quoteTable_csv/result.csv")