2015-12-18 72 views
0

我想反序列化一個簡單的JSON到一個案例類在Spark但得到一個異常 - 相同的代碼在常規環境中工作。

我使用json4s以下版本:

"org.json4s" % "json4s-jackson_2.11" % "3.3.0" 

代碼:

import org.apache.spark.{SparkContext, SparkConf} 
import scala.util.Try 
import org.json4s.DefaultFormats 

import org.json4s.native.JsonMethods._ 
import org.json4s.jackson.Serialization.{read, write} 

object TestParse { 
    def run() = { 

    val sconf = new SparkConf().setAppName("Test").setMaster("local[*]") 
    val sc = new SparkContext(sconf) 

    case class TestObj(name: String, value: String) 

    val testData = sc.parallelize(List.range(1, 10)) 

    val dataObjsRDD = testData.map { val => 
     implicit val formats = DefaultFormats // Workaround as DefaultFormats is not serializable 
     Try { // Always results in Failure 
     val jsonObj = parse("""{"name": "TheName", "value":"TheValue"}""") 
     jsonObj.extract[TestObj] 
     } 
    } 

    val d = dataObjsRDD.take(1) 
    println(d) 
    } 
} 

我得到的例外是:

result = {[email protected]} "Failure(org.json4s.package$MappingException: unknown error)" 
exception = {[email protected]} "org.json4s.package$MappingException: unknown error" 
    msg = "unknown error" 
    value = {char[13]@7846} 
    hash = 0 
    cause = {[email protected]} "java.lang.NullPointerException" 
    detailMessage = null 
    cause = {[email protected]} "java.lang.NullPointerException" 
    stackTrace = {StackTraceElement[40]@7845} 
    suppressedExceptions = {[email protected]} size = 0 
    detailMessage = "unknown error" 
    value = {char[13]@7846} 
    hash = 0 
    Throwable.cause = {[email protected]} "java.lang.NullPointerException" 
    detailMessage = null 
    cause = {[email protected]} "java.lang.NullPointerException" 
    stackTrace = {StackTraceElement[40]@7845} 
    suppressedExceptions = {[email protected]} size = 0 
    stackTrace = {StackTraceElement[29]@7780} 
    0 = {[email protected]} "org.json4s.Extraction$.extract(Extraction.scala:47)" 
    1 = {[email protected]} "org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)" 
    2 = {[email protected]} "TestParse$$anonfun$1$$anonfun$apply$1.apply(TestParse.scala:22)" 
    3 = {[email protected]} "TestParse$$anonfun$1$$anonfun$apply$1.apply(TestParse.scala:20)" 
    4 = {[email protected]} "scala.util.Try$.apply(Try.scala:161)" 
    5 = {[email protected]} "TestParse$$anonfun$1.apply(TestParse.scala:20)" 
    6 = {[email protected]} "TestParse$$anonfun$1.apply(TestParse.scala:18)" 
    7 = {[email protected]} "scala.collection.Iterator$$anon$11.next(Iterator.scala:328)" 
    8 = {[email protected]} "scala.collection.Iterator$class.foreach(Iterator.scala:727)" 
    9 = {[email protected]} "scala.collection.AbstractIterator.foreach(Iterator.scala:1157)" 
    10 = {[email protected]} "scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)" 
    11 = {[email protected]} "scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)" 
    12 = {[email protected]} "scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)" 
    13 = {[email protected]} "scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)" 
    14 = {[email protected]} "scala.collection.AbstractIterator.to(Iterator.scala:1157)" 
    15 = {[email protected]} "scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)" 
    16 = {[email protected]} "scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)" 
    17 = {[email protected]} "scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)" 
    18 = {[email protected]} "scala.collection.AbstractIterator.toArray(Iterator.scala:1157)" 
    19 = {[email protected]} "org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:909)" 
    20 = {[email protected]} "org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:909)" 
    21 = {[email protected]} "org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)" 
    22 = {[email protected]} "org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)" 
    23 = {[email protected]} "org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)" 
    24 = {[email protected]} "org.apache.spark.scheduler.Task.run(Task.scala:88)" 
    25 = {[email protected]} "org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)" 
    26 = {[email protected]} "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)" 
    27 = {[email protected]} "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)" 
    28 = {[email protected]} "java.lang.Thread.run(Thread.java:745)" 
    suppressedExceptions = {[email protected]} size = 0  

回答

2

這不是一個Spark問題,問題是你已經在方法本身中定義了你的案例類。如果您在頂層(在TestScope對象之外)定義案例類,它應該可以工作。

查看https://github.com/json4s/json4s/issues/125瞭解更多信息。

+0

就是這樣 - 謝謝! –