0
我想反序列化一個簡單的JSON到一個案例類在Spark但得到一個異常 - 相同的代碼在常規環境中工作。
我使用json4s以下版本:
"org.json4s" % "json4s-jackson_2.11" % "3.3.0"
代碼:
import org.apache.spark.{SparkContext, SparkConf}
import scala.util.Try
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods._
import org.json4s.jackson.Serialization.{read, write}
object TestParse {
def run() = {
val sconf = new SparkConf().setAppName("Test").setMaster("local[*]")
val sc = new SparkContext(sconf)
case class TestObj(name: String, value: String)
val testData = sc.parallelize(List.range(1, 10))
val dataObjsRDD = testData.map { val =>
implicit val formats = DefaultFormats // Workaround as DefaultFormats is not serializable
Try { // Always results in Failure
val jsonObj = parse("""{"name": "TheName", "value":"TheValue"}""")
jsonObj.extract[TestObj]
}
}
val d = dataObjsRDD.take(1)
println(d)
}
}
我得到的例外是:
result = {[email protected]} "Failure(org.json4s.package$MappingException: unknown error)"
exception = {[email protected]} "org.json4s.package$MappingException: unknown error"
msg = "unknown error"
value = {char[13]@7846}
hash = 0
cause = {[email protected]} "java.lang.NullPointerException"
detailMessage = null
cause = {[email protected]} "java.lang.NullPointerException"
stackTrace = {StackTraceElement[40]@7845}
suppressedExceptions = {[email protected]} size = 0
detailMessage = "unknown error"
value = {char[13]@7846}
hash = 0
Throwable.cause = {[email protected]} "java.lang.NullPointerException"
detailMessage = null
cause = {[email protected]} "java.lang.NullPointerException"
stackTrace = {StackTraceElement[40]@7845}
suppressedExceptions = {[email protected]} size = 0
stackTrace = {StackTraceElement[29]@7780}
0 = {[email protected]} "org.json4s.Extraction$.extract(Extraction.scala:47)"
1 = {[email protected]} "org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)"
2 = {[email protected]} "TestParse$$anonfun$1$$anonfun$apply$1.apply(TestParse.scala:22)"
3 = {[email protected]} "TestParse$$anonfun$1$$anonfun$apply$1.apply(TestParse.scala:20)"
4 = {[email protected]} "scala.util.Try$.apply(Try.scala:161)"
5 = {[email protected]} "TestParse$$anonfun$1.apply(TestParse.scala:20)"
6 = {[email protected]} "TestParse$$anonfun$1.apply(TestParse.scala:18)"
7 = {[email protected]} "scala.collection.Iterator$$anon$11.next(Iterator.scala:328)"
8 = {[email protected]} "scala.collection.Iterator$class.foreach(Iterator.scala:727)"
9 = {[email protected]} "scala.collection.AbstractIterator.foreach(Iterator.scala:1157)"
10 = {[email protected]} "scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)"
11 = {[email protected]} "scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)"
12 = {[email protected]} "scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)"
13 = {[email protected]} "scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)"
14 = {[email protected]} "scala.collection.AbstractIterator.to(Iterator.scala:1157)"
15 = {[email protected]} "scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)"
16 = {[email protected]} "scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)"
17 = {[email protected]} "scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)"
18 = {[email protected]} "scala.collection.AbstractIterator.toArray(Iterator.scala:1157)"
19 = {[email protected]} "org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:909)"
20 = {[email protected]} "org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:909)"
21 = {[email protected]} "org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)"
22 = {[email protected]} "org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)"
23 = {[email protected]} "org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)"
24 = {[email protected]} "org.apache.spark.scheduler.Task.run(Task.scala:88)"
25 = {[email protected]} "org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)"
26 = {[email protected]} "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)"
27 = {[email protected]} "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)"
28 = {[email protected]} "java.lang.Thread.run(Thread.java:745)"
suppressedExceptions = {[email protected]} size = 0
就是這樣 - 謝謝! –