2016-02-14 34 views
0

在運行此上ItelliJ,我收到了一些例外,如:異常線程「main」 org.apache.spark.SparkException:任務不可序列 代碼摘錄:任務不可序列例外星火同時呼籲JavaPairRDD.max

`

public class MostPopularSuperHero { 

public static void main(String args[]) { 
    SparkConf conf = new SparkConf().setAppName("MostPopularSuperHero").setMaster("local[*]"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    class HrDict { 
     Map<Integer, String> getHeroDict() { 
      Map<Integer, String> heroDict = new HashMap<>(); 
      BufferedReader br = null; 

      try { 

       String sCurrentLine; 

       br = new BufferedReader(new FileReader("/Users/11130/udemy/SparkCourse/Marvel-Names.txt")); 

       while ((sCurrentLine = br.readLine()) != null) { 
        String str = sCurrentLine; 
        String[] fields = str.split(" ", 2); 
        heroDict.put(Integer.parseInt(fields[0]), fields[1]); 
       } 

      } catch (IOException e) { 
       e.printStackTrace(); 
      } 

      return heroDict; 
     } 
    } 

    class DummyComparator implements Comparator<Tuple2<Integer, String> > { 
     @Override 
     public int compare(Tuple2<Integer, String> o1, Tuple2<Integer, String> o2) { 
      return Integer.compare(o1._1(), o2._1()); 
     } 
    } 

    Broadcast<Map<Integer, String> > heroDict = sc.broadcast(new HrDict().getHeroDict()); 
    JavaRDD<String> lines = sc.textFile("/Users/11130/udemy/SparkCourse/Marvel-Graph.txt"); 

    JavaPairRDD<Integer, Integer> countOfOccurences = lines.mapToPair(
      s -> { 
       String[] heroes = s.split(" "); 
       return new Tuple2<>(Integer.parseInt(heroes[0]), heroes.length - 1); 
      } 
    ).reduceByKey(
      (x, y) -> x + y 
    ); 

    JavaPairRDD<Integer, String> flippedCountOfOccurences = countOfOccurences.mapToPair(
      s -> new Tuple2<>(s._2(), heroDict.getValue().get(s._1())) 
    ); 



    Tuple2<Integer, String> result = flippedCountOfOccurences.max(new DummyComparator()); 

    System.out.println("The most populat superhero is " + result._2() + " with " + result._1() + " number of occurences"); 

}} 

`

錯誤stackt種族:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1008) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007) 
    at org.apache.spark.rdd.RDD$$anonfun$max$1.apply(RDD.scala:1396) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.max(RDD.scala:1395) 
    at org.apache.spark.api.java.JavaRDDLike$class.max(JavaRDDLike.scala:602) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.max(JavaRDDLike.scala:46) 
    at MostPopularSuperHero.main(MostPopularSuperHero.java:73) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 
Caused by: java.io.NotSerializableException: MostPopularSuperHero$1DummyComparator 
Serialization stack: 
    - object not serializable (class: MostPopularSuperHero$1DummyComparator, value: [email protected]) 
    - field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator) 
    - object (class scala.math.LowPriorityOrderingImplicits$$anon$7, [email protected]) 
    - field (class: org.apache.spark.rdd.RDD$$anonfun$max$1, name: ord$10, type: interface scala.math.Ordering) 
    - object (class org.apache.spark.rdd.RDD$$anonfun$max$1, <function0>) 
    - field (class: org.apache.spark.rdd.RDD$$anonfun$max$1$$anonfun$apply$51, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$max$1) 
    - object (class org.apache.spark.rdd.RDD$$anonfun$max$1$$anonfun$apply$51, <function2>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
    ... 21 more 
+1

'造成的:java.io.NotSerializableException:MostPopularSuperHero $ 1DummyComparator'您是否嘗試過做'DummyComparator'序列化? –

+0

我們該怎麼做? – Karshit

+0

只需將'extends Serializable'添加到'class DummyComparator implements Comparator >' –

回答

0

這是我用什麼(基本上,我們需要實現序列化)

class DummyComparator implements Serializable, Comparator<Tuple2<Integer, String> >{ 
      @Override 
      public int compare(Tuple2<Integer, String> o1, Tuple2<Integer, String> o2) { 
       return Integer.compare(o1._1(), o2._1()); 
      } 
     }