我使用Spark Streaming與Scala,並從卡夫卡獲取json記錄。我想解析它,以便我可以獲取值(日期時間和質量)和過程。Scala解析來自kafka的json記錄
這裏是我的代碼:
stream.foreachRDD(rdd => {
rdd.collect().foreach(i =>
println(msgParse(i.value()).quality)
)
})
而且我有這樣的情況下,階級和我解析功能:
case class diskQuality(datetime: String , quality : Double) extends Serializable
def msgParse(value: String): diskQuality = {
import org.json4s._
import org.json4s.native.JsonMethods._
implicit val formats = DefaultFormats
val res = parse(value).extract[diskQuality]
return res
}
我已經添加了這種相關性:
libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4"
的記錄我收到此格式:
"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
但是我得到這個錯誤:
Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}"
編輯:
當我嘗試解析使用它的工作原理相同的功能如下。但是,即使卡夫卡消息都以相同的格式,但它仍然給出了同樣的錯誤:
val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}"
我使用scalaVersion:=「2.10.6」和json4s-native_2.10"
任何幫助。將非常感激。謝謝你們的時間
第一種格式是正確的 - 「{\」datetime \「:\」14-05-2017 14:18:30 \「,\」quality \「:92.6}」。而你的代碼也適用於它。你能否檢查一下build.sbt中的Scala版本是什麼。 org.json4s依賴是2.10嗎?此外,您可以記錄msgParse函數的值參數,以檢查它的實際值。 –
感謝您的回覆,我編輯了我的問題,這是我打印msgParse時的值:「{\」datetime \「:\」24-04-2017 07:53:30 \「,\」quality \「:100.0}」 – AsmaaM
@AsmaaM如果這是您的控制檯輸出 - 您在引號轉義時遇到問題,您能否檢查您的製作人發送給kafka的內容? – ledniov