2017-06-15 136 views
0

我使用Spark Streaming與Scala,並從卡夫卡獲取json記錄。我想解析它,以便我可以獲取值(日期時間和質量)和過程。Scala解析來自kafka的json記錄

這裏是我的代碼:

stream.foreachRDD(rdd => { 
    rdd.collect().foreach(i => 
    println(msgParse(i.value()).quality) 
) 
}) 

而且我有這樣的情況下,階級和我解析功能:

case class diskQuality(datetime: String , quality : Double) extends Serializable 

def msgParse(value: String): diskQuality = { 

    import org.json4s._ 
    import org.json4s.native.JsonMethods._ 

    implicit val formats = DefaultFormats 

    val res = parse(value).extract[diskQuality] 
    return res 

} 

我已經添加了這種相關性:

libraryDependencies += "org.json4s" % "json4s-native_2.10" % "3.2.4" 

的記錄我收到此格式:

"{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}" 

但是我得到這個錯誤:

Exception in thread "main" org.json4s.ParserUtil$ParseException: expected field or array Near: ,\"quality\":100.0}" 

編輯:

當我嘗試解析使用它的工作原理相同的功能如下。但是,即使卡夫卡消息都以相同的格式,但它仍然給出了同樣的錯誤:

val test = "{\"datetime\":\"14-05-2017 14:18:30\",\"quality\":92.6}" 

我使用scalaVersion:=「2.10.6」和json4s-native_2.10"

任何幫助。將非常感激。謝謝你們的時間

+0

第一種格式是正確的 - 「{\」datetime \「:\」14-05-2017 14:18:30 \「,\」quality \「:92.6}」。而你的代碼也適用於它。你能否檢查一下build.sbt中的Scala版本是什麼。 org.json4s依賴是2.10嗎?此外,您可以記錄msgParse函數的值參數,以檢查它的實際值。 –

+0

感謝您的回覆,我編輯了我的問題,這是我打印msgParse時的值:「{\」datetime \「:\」24-04-2017 07:53:30 \「,\」quality \「:100.0}」 – AsmaaM

+0

@AsmaaM如果這是您的控制檯輸出 - 您在引號轉義時遇到問題,您能否檢查您的製作人發送給kafka的內容? – ledniov

回答

1

看起來你對你的卡夫卡製片方有問題,你必須通過更換轉義引號與以下格式來結束:

{"datetime":"14-05-2017 14:18:30","quality":92.6}

它會給你格式正確的JSON字符串。

+0

一切正常吧!再次感謝你 – AsmaaM