2016-06-26 29 views
2

我有一些中間數據需要存儲在HDFS和本地數據中。我正在使用Spark 1.6。在HDFS中作爲中間形式,我在/output/testDummy/part-00000/output/testDummy/part-00001中獲取數據。我想使用Java/Scala將這些分區保存在本地,以便我可以將它們分別保存爲/users/home/indexes/index.nt(通過合併在本地)或/users/home/indexes/index-0000.nt/home/indexes/index-0001.nt使用mapPartition和迭代器保存火花RDD

這裏是我的代碼: 注:testDummy是相同的測試,輸出與兩個分區。我想將它們單獨存儲或合併,但使用index.nt文件進行本地存儲。我更喜歡分開存儲在兩個數據節點中。我正在使用集羣並在YARN上提交Spark任務。我還添加了一些評論,多少次以及我得到的數據。我怎麼辦?任何幫助表示讚賞。

val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy") 
println("testDummy done") //1 time print 

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = { 
    println("Inside savesData")         // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2 
    println("iter size"+iterator.size)       // 2 735 2 735 values 
    val filenamesWithExtension = outputPath + "/index.nt" 
    println("filenamesWithExtension "+filenamesWithExtension.length) //4 times 
    var list = List[(String)]() 

    val fileWritter = new FileWriter(filenamesWithExtension,true) 
    val bufferWritter = new BufferedWriter(fileWritter) 

    while (iterator.hasNext){      //iterator.hasNext is false 
     println("inside iterator")     //0 times 
     val dat = iterator.next() 
     println("datadata "+iterator.next()) 

     bufferWritter.write(dat + "\n") 
     bufferWritter.flush() 
     println("index files written") 

     val dataElements = dat.split(" ") 
     println("dataElements")         //0 
     list = list.::(dataElements(0)) 
     list = list.::(dataElements(1)) 
     list = list.::(dataElements(2)) 
    } 
    bufferWritter.close() //closing 
    println("savesData method end")       //4 times when coal=2 
    list.iterator 
} 

println("before saving data into local")        //1 
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData) 
println("testRDD partitions "+test.getNumPartitions)        //2 
println("testRDD size "+test.collect().length)        //0 
println("after saving data into local") //1 

PS:我也跟着,thisthis但不完全相同什麼我尋找,我也莫名其妙,但沒有得到任何東西index.nt

+2

斯卡拉已經使'名單使世界變得更好一點::(dataElements(2))'等同於'dataElements(2):: list'所以不要讓斯卡拉難過,並開始使用這種語法在至少當方法類似操作符時。順便說一句,'ListBuffer'可能在這裏更合適 – Dici

回答

5

幾件事情:

  • 如果您打算稍後使用數據,請不要致電Iterator.sizeIteratorsTraversableOnce。計算Iterator大小的唯一方法是遍歷其所有元素,之後不再有數據要讀取。
  • 請勿使用像mapPartitions這樣的轉換作爲副作用。如果您想執行某種類型的IO使用操作,如foreach/foreachPartition。這是一個糟糕的做法,並不能保證給定的代碼只會被執行一次。
  • 行動或轉換內部的本地路徑是特定工作人員的本地路徑。如果你想直接在客戶端機器上寫入,你應該首先使用collecttoLocalIterator來獲取數據。儘管寫入分佈式存儲並稍後獲取數據可能會更好。
0

Java 7中提供了手段看目錄。

https://docs.oracle.com/javase/tutorial/essential/io/notification.html

的想法是創建一個監視服務,與感興趣的目錄(何況你感興趣的活動,如文件的創建,刪除等)進行註冊,你看,你會通知任何事件,如創建,刪除等,你可以採取任何你想要的行動。

你將不得不依賴於Java的API HDFS嚴重(如適用)。

在後臺運行程序,因爲它會永久等待事件。 (你以後做任何你想要的你可以編寫邏輯退出)

。另一方面,shell腳本也會有幫助。

要知道HDFS文件系統的一致性模型的同時讀取文件。

希望這有助於一些想法。