2014-01-12 52 views
1

是我的代碼:APARCH火花,NotSerializableException:這裏org.apache.hadoop.io.Text

val bg = imageBundleRDD.first() //bg:[Text, BundleWritable] 
    val res= imageBundleRDD.map(data => { 
           val desBundle = colorToGray(bg._2)  //lineA:NotSerializableException: org.apache.hadoop.io.Text 
           //val desBundle = colorToGray(data._2) //lineB:everything is ok 
           (data._1, desBundle) 
          }) 
    println(res.count) 

lineB順利,但LINEA表明:org.apache.spark.SparkException:作業已中止:任務不可序列:java.io.NotSerializableException:org.apache.hadoop.io.Text

我嘗試使用使用KRYO解決我的問題,但它似乎什麼也沒有發生變化:

import com.esotericsoftware.kryo.Kryo 
import org.apache.spark.serializer.KryoRegistrator 

class MyRegistrator extends KryoRegistrator { 
    override def registerClasses(kryo: Kryo) { 
     kryo.register(classOf[Text]) 
     kryo.register(classOf[BundleWritable]) 
    } 
} 

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator") 
val sc = new SparkContext(... 

謝謝!

+0

Dup的看到http://stackoverflow.com/a/22594142/1586965 – samthebest

+0

使用這個答案http://stackoverflow.com/a/25270600/1586965 – samthebest

回答

1

當我的Java代碼讀取包含文本鍵的序列文件時,我遇到了類似的問題。 我發現這個職位有幫助:

JavaPairRDD<String, VideoRecording> mapped = videos.map(new PairFunction<Tuple2<Text,VideoRecording>,String,VideoRecording>() { 
    @Override 
    public Tuple2<String, VideoRecording> call(
      Tuple2<Text, VideoRecording> kv) throws Exception { 
     // Necessary to copy value as Hadoop chooses to reuse objects 
     VideoRecording vr = new VideoRecording(kv._2); 
     return new Tuple2(kv._1.toString(), vr); 
    } 
}); 

注意本說明中JavaSparkContext sequenceFile方法API中的:

http://apache-spark-user-list.1001560.n3.nabble.com/How-to-solve-java-io-NotSerializableException-org-apache-hadoop-io-Text-td2650.html

就我而言,我使用的地圖轉換的文本字符串:

注:由於Hadoop的RecordReader類重新使用相同的可寫對象的每個記錄,緩存直接返回RDD將創建同一個對象的許多參考。如果您計劃直接緩存Hadoop可寫對象,則應先使用映射函數將其複製。

0

您的代碼序列化問題的原因是你的KRYO設置,同時關閉,是不是很正確:

變化:

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
System.setProperty("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator") 
val sc = new SparkContext(... 

到:

val sparkConf = new SparkConf() 
    // ... set master, appname, etc, then: 
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    .set("spark.kryo.registrator", "hequn.spark.reconstruction.MyRegistrator") 

val sc = new SparkContext(sparkConf) 
0

Apache Spark同時處理序列文件,我們必須遵循這些技術:

 
-- Use Java equivalent Data Types in place of Hadoop data types. 
-- Spark Automatically converts the Writables into Java equivalent Types. 

Ex:- We have a sequence file "xyz", here key type is say Text and value 
is LongWritable. When we use this file to create an RDD, we need use their 
java equivalent data types i.e., String and Long respectively. 

val mydata = = sc.sequenceFile[String, Long]("path/to/xyz") 
mydata.collect