我很新的spark,我試圖從kafka主題接收一個結構化爲json的DStream,我想解析每個json的內容。我收到的JSON是這樣的:在spark-streaming中解析json
{"type":"position","ident":"IBE32JZ","air_ground":"A","alt":"34000","clock":"1409733420","id":"IBE32JZ-1409715361-ed-0002:0","gs":"446","heading":"71","lat":"44.50987","lon":"2.98972","reg":"ECJRE","squawk":"1004","updateType":"A","altChange":" "}
我想只提取IDENT領域,至少現在,我使用電梯,JSON庫解析德數據。我的計劃是這樣的:
但低於拋出我的異常:
java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
at net.liftweb.json.JsonAST$JValue.extract(JsonAST.scala:300)
at aero.catec.stratio.ScalaExample$.parser(ScalaExample.scala:33)
at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
at org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:575)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:560)
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
的事情是,如果運行不使用火花(從文件中讀取)同它完美。當我嘗試將它放入火花程序時,問題就開始了。另外,如果我將解析器功能更改爲如下所示:
def parser(json: String): JValue = {
val parsedJson = parse(json)
return (parsedJson \\ "ident")
}
它也有效。但是當我嘗試提取實際的字符串時,我得到了同樣的錯誤。
謝謝你的幫助。我希望我解釋得很好。
這可能是你正在使用的scala版本不匹配。 – 2014-09-03 12:20:13
我可以認爲應該解析「paso1.extract [PlaneInfo]」json.extract [PlaneInfo]? – Gillespie 2015-09-02 15:42:44