2017-09-30 101 views
2
public class App implements Serializable { 
    public static void main(String[] args) { 
     SparkSession sparkSession = SparkSession.builder() 
       .appName("SparkSQL") 
       .master("local") 
       .getOrCreate(); 
     Dataset<Row> df = sparkSession.createDataFrame(
       Arrays.asList(new Person("panfei",27)),Person.class); 
     System.out.println(df.rdd().count()); 
    } 
} 

我初學者的火花,我的本地機器上運行的代碼,它一直是確保該人實施Serializable,但代碼拋出此異常:

17/09/30 18:13:26 INFO SparkContext: Starting job: count at App.java:32 
17/09/30 18:13:26 INFO DAGScheduler: Got job 0 (count at App.java:32) with 1 output partitions 
17/09/30 18:13:26 INFO DAGScheduler: Final stage: ResultStage 0 (count at App.java:32) 
17/09/30 18:13:26 INFO DAGScheduler: Parents of final stage: List() 
17/09/30 18:13:26 INFO DAGScheduler: Missing parents: List() 
17/09/30 18:13:26 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at rdd at App.java:32), which has no missing parents 
17/09/30 18:13:26 INFO TaskSchedulerImpl: Cancelling stage 0 
17/09/30 18:13:26 INFO DAGScheduler: ResultStage 0 (count at App.java:32) failed in Unknown s due to Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.collection.Iterator$$anon$11 
Serialization stack: 
    - object not serializable (class: scala.collection.Iterator$$anon$11, value: empty iterator) 
    - field (class: scala.collection.Iterator$$anonfun$toStream$1, name: $outer, type: interface scala.collection.Iterator) 
    - object (class scala.collection.Iterator$$anonfun$toStream$1, <function0>) 
    - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0) 
    - object (class scala.collection.immutable.Stream$Cons, Stream([27,panfei])) 
    - field (class: org.apache.spark.sql.execution.LocalTableScanExec, name: rows, type: interface scala.collection.Seq) 
    - object (class org.apache.spark.sql.execution.LocalTableScanExec, LocalTableScan [age#0, name#1] 
) 
    - field (class: org.apache.spark.sql.execution.DeserializeToObjectExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan) 
    - object (class org.apache.spark.sql.execution.DeserializeToObjectExec, DeserializeToObject createexternalrow(age#0, name#1.toString, StructField(age,IntegerType,true), StructField(name,StringType,true)), obj#6: org.apache.spark.sql.Row 
+- LocalTableScan [age#0, name#1] 
) 
    - field (class: org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2, name: $outer, type: class org.apache.spark.sql.execution.DeserializeToObjectExec) 
    - object (class org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2, <function2>) 
    - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1, name: f$22, type: interface scala.Function2) 
    - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1, <function0>) 
    - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1) 
    - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24, <function3>) 
    - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3) 
    - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at rdd at App.java:32) 
    - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) 
    - object (class org.apache.spark.OneToOneDependency, [email protected]) 
    - writeObject data (class: scala.collection.immutable.$colon$colon) 
    - object (class scala.collection.immutable.$colon$colon, List([email protected])) 
    - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) 
    - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[3] at rdd at App.java:32) 
    - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) 
    - object (class org.apache.spark.OneToOneDependency, [email protected]) 
    - writeObject data (class: scala.collection.immutable.$colon$colon) 
    - object (class scala.collection.immutable.$colon$colon, List([email protected])) 
    - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) 
    - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[4] at rdd at App.java:32) 
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
    - object (class scala.Tuple2, (MapPartitionsRDD[4] at rdd at App.java:32,<function2>)) 
17/09/30 18:13:26 INFO DAGScheduler: Job 0 failed: count at App.java:32, took 0.098482 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: scala.collection.Iterator$$anon$11 
Serialization stack: 
    - object not serializable (class: scala.collection.Iterator$$anon$11, value: empty iterator) 
    - field (class: scala.collection.Iterator$$anonfun$toStream$1, name: $outer, type: interface scala.collection.Iterator) 
    - object (class scala.collection.Iterator$$anonfun$toStream$1, <function0>) 
    - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0) 
    - object (class scala.collection.immutable.Stream$Cons, Stream([27,panfei])) 
    - field (class: org.apache.spark.sql.execution.LocalTableScanExec, name: rows, type: interface scala.collection.Seq) 
    - object (class org.apache.spark.sql.execution.LocalTableScanExec, LocalTableScan [age#0, name#1] 
) 
    - field (class: org.apache.spark.sql.execution.DeserializeToObjectExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan) 
    - object (class org.apache.spark.sql.execution.DeserializeToObjectExec, DeserializeToObject createexternalrow(age#0, name#1.toString, StructField(age,IntegerType,true), StructField(name,StringType,true)), obj#6: org.apache.spark.sql.Row 
+- LocalTableScan [age#0, name#1] 
) 
    - field (class: org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2, name: $outer, type: class org.apache.spark.sql.execution.DeserializeToObjectExec) 
    - object (class org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2, <function2>) 
    - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1, name: f$22, type: interface scala.Function2) 
    - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1, <function0>) 
    - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1) 
    - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24, <function3>) 
    - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3) 
    - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at rdd at App.java:32) 
    - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) 
    - object (class org.apache.spark.OneToOneDependency, [email protected]) 
    - writeObject data (class: scala.collection.immutable.$colon$colon) 
    - object (class scala.collection.immutable.$colon$colon, List([email protected])) 
    - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) 
    - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[3] at rdd at App.java:32) 
    - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) 
    - object (class org.apache.spark.OneToOneDependency, [email protected]) 
    - writeObject data (class: scala.collection.immutable.$colon$colon) 
    - object (class scala.collection.immutable.$colon$colon, List([email protected])) 
    - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) 
    - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[4] at rdd at App.java:32) 
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
    - object (class scala.Tuple2, (MapPartitionsRDD[4] at rdd at App.java:32,<function2>)) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1000) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918) 
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:862) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) 
    at org.apache.spark.rdd.RDD.count(RDD.scala:1157) 
    at com.ctrip.market.dmp.spark.app.App.main(App.java:32) 
Caused by: java.io.NotSerializableException: scala.collection.Iterator$$anon$11 
Serialization stack: 
    - object not serializable (class: scala.collection.Iterator$$anon$11, value: empty iterator) 
    - field (class: scala.collection.Iterator$$anonfun$toStream$1, name: $outer, type: interface scala.collection.Iterator) 
    - object (class scala.collection.Iterator$$anonfun$toStream$1, <function0>) 
    - field (class: scala.collection.immutable.Stream$Cons, name: tl, type: interface scala.Function0) 
    - object (class scala.collection.immutable.Stream$Cons, Stream([27,panfei])) 
    - field (class: org.apache.spark.sql.execution.LocalTableScanExec, name: rows, type: interface scala.collection.Seq) 
    - object (class org.apache.spark.sql.execution.LocalTableScanExec, LocalTableScan [age#0, name#1] 
) 
    - field (class: org.apache.spark.sql.execution.DeserializeToObjectExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan) 
    - object (class org.apache.spark.sql.execution.DeserializeToObjectExec, DeserializeToObject createexternalrow(age#0, name#1.toString, StructField(age,IntegerType,true), StructField(name,StringType,true)), obj#6: org.apache.spark.sql.Row 
+- LocalTableScan [age#0, name#1] 
) 
    - field (class: org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2, name: $outer, type: class org.apache.spark.sql.execution.DeserializeToObjectExec) 
    - object (class org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2, <function2>) 
    - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1, name: f$22, type: interface scala.Function2) 
    - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1, <function0>) 
    - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1) 
    - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24, <function3>) 
    - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3) 
    - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at rdd at App.java:32) 
    - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) 
    - object (class org.apache.spark.OneToOneDependency, [email protected]) 
    - writeObject data (class: scala.collection.immutable.$colon$colon) 
    - object (class scala.collection.immutable.$colon$colon, List([email protected])) 
    - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) 
    - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[3] at rdd at App.java:32) 
    - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD) 
    - object (class org.apache.spark.OneToOneDependency, [email protected]) 
    - writeObject data (class: scala.collection.immutable.$colon$colon) 
    - object (class scala.collection.immutable.$colon$colon, List([email protected])) 
    - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) 
    - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[4] at rdd at App.java:32) 
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
    - object (class scala.Tuple2, (MapPartitionsRDD[4] at rdd at App.java:32,<function2>)) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:993) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918) 
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:862) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
+0

所以你有什麼問題嗎? –

+0

我的代碼有問題嗎?謝謝。 – user7922907

回答

-1

主類並不需要實現Serializable。

1

這個問題已經通過改變階版本已經解決了從2.10到2.11。任何地方,我都很困惑。