我從3行這樣一個輸入文件(實際上輸入的是一個連續的流的TCP套接字)一Source[ByteString, _]
:解碼分塊JSON與AKKA流
{"a":[2
33]
}
現在的問題是,我想要將其解析爲Source[ChangeMessage,_]
,但是我發現的唯一示例是在每行有一個完整的JSON消息時處理,而不是每個JSON消息可以分割成多行。
我發現的一個示例是this庫,但它期望}
或,
作爲最後一個字符,即每行一個JSON。下面的例子顯示了這個設置。
"My decoder" should "decode chunked json" in {
implicit val sys = ActorSystem("test")
implicit val mat = ActorMaterializer()
val file = Paths.get("chunked_json_stream.json")
val data = FileIO.fromPath(file)
.via(CirceStreamSupport.decode[ChangeMessage])
.runWith(TestSink.probe[ChangeMessage])
.request(1)
.expectComplete()
}
另一種替代方法是使用摺疊和平衡}
並且僅當完成一個整體JSON發射。這樣做的問題在於,摺疊操作符僅在流完成時發出,因爲這是連續的流,所以我不能在這裏使用它。
我的問題是:什麼是分塊解析JSON在AKKA流流 以及是否有已經做 這個任何可用的軟件以最快的方式?如果可能我想使用circe
實際上,我通過使用CirceStreamSupport而沒有任何JsonFraming工作? – user3139545
@ user3139545感謝您的評論。我澄清了我的答案。 –