2017-04-07 85 views
1

我使用下面的代碼寫一個RDD作爲一個序列文件無法寫入與火花RDD API序列文件

@Test 
    def testSparkWordCount(): Unit = { 
    val words = Array("Hello", "Hello", "World", "Hello", "Welcome", "World") 
    val conf = new SparkConf().setMaster("local").setAppName("testSparkWordCount") 
    val sc = new SparkContext(conf) 

    val dir = "file:///" + System.currentTimeMillis() 
    sc.parallelize(words).map(x => (x, 1)).saveAsHadoopFile(
     dir, 
     classOf[Text], 
     classOf[IntWritable], 
     classOf[org.apache.hadoop.mapred.SequenceFileOutputFormat[Text, IntWritable]] 
    ) 

    sc.stop() 
    } 

當我運行它,它抱怨說

Caused by: java.io.IOException: wrong key class: java.lang.String is not class org.apache.hadoop.io.Text 
    at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1373) 
    at org.apache.hadoop.mapred.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:76) 
    at org.apache.spark.internal.io.SparkHadoopWriter.write(SparkHadoopWriter.scala:94) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1139) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1137) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1137) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1360) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1145) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 

應該我必須使用sc.parallelize(words).map(x => (new Text(x), new IntWritable(1))而不是sc.parallelize(words).map(x => (x, 1))?我不認爲我必須明確地包裝它,因爲SparkContext已經提供了將前綴類型包裝到相應的Writable中的含義。

所以,我應該怎麼做才能讓這段代碼工作

回答

1

是,SparkContext提供implicits轉換的。但是,這種轉換不保存過程中應用,必須在平時的Scala方式使用:

import org.apache.spark.SparkContext._ 
val mapperFunction: String=> (Text,IntWritable) = x => (x, 1) 
... parallelize(words).map(mapperFunction).saveAsHadoopFile ... 
+0

明白了,謝謝@pashaz爲有用的答案 – Tom

+0

也爲包括隱式轉換,可以用「saveAsSequenceFile」的方法: .MAP (x =>(x,1))。saveAsSequenceFile(dir) – pasha701