2014-11-22 75 views
3

我在CDH 5.2.0上使用Spark 1.1.0,並試圖確保我可以讀取和寫入hdfs。Spark寫入hdfs不能使用saveAsNewAPIHadoopFile方法

我很快意識到.textFile和.saveAsTextFile調用舊的API,似乎與我們的hdfs版本不兼容。

def testHDFSReadOld(sc: SparkContext, readFile: String){ 
    //THIS WILL FAIL WITH 
    //(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data 
    //java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420) 

    sc.textFile(readFile).take(2).foreach(println) 
    } 

    def testHDFSWriteOld(sc: SparkContext, writeFile: String){ 
    //THIS WILL FAIL WITH 
    //(TID 0, dl1rhd416.internal.edmunds.com): java.lang.IllegalStateException: unread block data 
    //java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2420) 

    sc.parallelize(List("THIS","ISCOOL")).saveAsTextFile(writeFile) 
    } 

移動到新的API方法固定從hdfs讀取!

def testHDFSReadNew(sc: SparkContext, readFile: String){ 
    //THIS WORKS 
    sc.newAPIHadoopFile(readFile, classOf[TextInputFormat], classOf[LongWritable], 
     classOf[Text],sc.hadoopConfiguration).map{ 
     case (x:LongWritable, y: Text) => y.toString 
    }.take(2).foreach(println) 
    } 

所以看來我正在取得進展。寫作不再像上面那樣出現嚴重錯誤,而是似乎正在工作。唯一的問題是,除了目錄中孤獨的SUCCESS標誌文件之外,什麼都沒有。更令人困惑的是,日誌顯示數據正在寫入_temporary目錄。看起來好像文件提交者從未意識到需要將文件從_temporary目錄移動到輸出目錄。

def testHDFSWriteNew(sc: SparkContext, writeFile: String){ 
    /*This will have an error message of: 
    INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(dl1rhd400.internal.edmunds.com,35927) 
    14/11/21 02:02:27 INFO ConnectionManager: Key not valid ? [email protected] 
     14/11/21 02:02:27 INFO ConnectionManager: key already cancelled ? [email protected] 
     java.nio.channels.CancelledKeyException 
    at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) 
    at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 

    However lately it hasn't even had errors, symptoms are no part files in the directory but a success flag is there 
    */ 
    val conf = sc.hadoopConfiguration 
    conf.set("mapreduce.task.files.preserve.failedtasks", "true") 
    conf.set("mapred.output.dir", writeFile) 
    sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x))) 
     .saveAsNewAPIHadoopFile(writeFile, classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], conf) 

    } 

當我在本地運行並指定hdfs路徑時,文件在hdfs中顯示正常。只有當我在我們的火花獨立羣集上運行時纔會發生這種情況。

我提交作業如下: 火花提交--deploy模式客戶端--master火花:// sparkmaster --class driverclass driverjar

回答

0

你可以嘗試用下面的代碼?

import org.apache.hadoop.io._ 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat 
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x))) 
nums.saveAsNewAPIHadoopFile[TextOutputFormat[IntWritable, Text]]("/data/newAPIHadoopFile") 

以下代碼也適用於我。

val x = sc.parallelize(List("THIS","ISCOOL")).map(x => (NullWritable.get, new Text(x))) 
x.saveAsNewAPIHadoopFile("/data/nullwritable", classOf[NullWritable], classOf[Text], classOf[TextOutputFormat[NullWritable, Text]], sc.hadoopConfiguration) 

[根@ sparkmaster〜]#hadoop的FS -cat /數據/ nullwritable/*

15/08/20 02:09:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
相關問題