2017-05-26 86 views
3

我從3行這樣一個輸入文件(實際上輸入的是一個連續的流的TCP套接字)一Source[ByteString, _]解碼分塊JSON與AKKA流

{"a":[2 
33] 
} 

現在的問題是,我想要將其解析爲Source[ChangeMessage,_],但是我發現的唯一示例是在每行有一個完整的JSON消息時處理,而不是每個JSON消息可以分割成多行。

我發現的一個示例是this庫,但它期望},作爲最後一個字符,即每行一個JSON。下面的例子顯示了這個設置。

"My decoder" should "decode chunked json" in { 
    implicit val sys = ActorSystem("test") 
    implicit val mat = ActorMaterializer() 
    val file = Paths.get("chunked_json_stream.json") 
    val data = FileIO.fromPath(file) 
    .via(CirceStreamSupport.decode[ChangeMessage]) 
    .runWith(TestSink.probe[ChangeMessage]) 
    .request(1) 
    .expectComplete() 
    } 

另一種替代方法是使用摺疊和平衡}並且僅當完成一個整體JSON發射。這樣做的問題在於,摺疊操作符僅在流完成時發出,因爲這是連續的流,所以我不能在這裏使用它。

我的問題是:什麼是分塊解析JSON在AKKA流流 以及是否有已經做 這個任何可用的軟件以最快的方式?如果可能我想使用circe

回答

2

由於knutwalker/akka-stream-json文檔說:

這種流動甚至支持在解析任何碎片他們可能會到達,這是偉大的消耗基於流/ SSE API的多JSON文檔。

在你的情況下,所有你需要做的是剛剛劃定傳入字節串:

"My decoder" should "decode chunked json" in { 
    implicit val sys = ActorSystem("test") 
    implicit val mat = ActorMaterializer() 
    val file = Paths.get("chunked_json_stream.json") 

    val sourceUnderTest = 
     FileIO.fromPath(file) 
     .via(Framing.delimiter(ByteString("\n"), 8192, allowTruncation = true)) 
     .via(CirceStreamSupport.decode[ChangeMessage]) 

    sourceUnderTest 
     .runWith(TestSink.probe[ChangeMessage]) 
     .request(1) 
     .expectNext(ChangeMessage(List(233))) 
     .expectComplete() 
} 

那是因爲從文件中讀取數據時,字節串元素包含多行,因此瑟茜無法解析畸形jsons。當用新行分隔時,流中的每個元素都是一個單獨的行,因此Circe能夠使用前面提到的功能來解析它。

+0

實際上,我通過使用CirceStreamSupport而沒有任何JsonFraming工作? – user3139545

+0

@ user3139545感謝您的評論。我澄清了我的答案。 –

0

不幸的是,我不知道任何支持基於流的JSON解析的Scala庫。它似乎對我來說,谷歌Gson有一些支持,但我不完全確定它可以正確處理「損壞」的輸入。

但是,您可以做的是收集流式傳輸的JSON文檔,與Framing.delimiter相似。這與您提到的替代方法非常相似,但它並未使用fold();如果你這樣做,你可能需要模仿Framing.delimiter做的事情,但不是尋找一個單獨的分隔符,你需要平衡大括號(如果頂級數組是可能的話,可以選擇括號),緩存中間數據,直到整個文檔通過,你會發出一個適合解析的單個塊。

正如一個側面說明,對適用於阿卡流中使用流JSON解析器可能看起來像這樣適當的接口:

trait Parser { 
    def update(data: Array[Byte]) // or String 
    def pull(): Option[Either[Error, JsonEvent]] 
} 

其中pull()回報None,如果它不能再讀書,但是有在傳入文檔中沒有實際的語法錯誤,並且JsonEvent是用於描述流解析器的事件的一些標準結構(即,具有諸如BeginObjectBeginArray,EndObject,EndArrayString等的子類的密封特徵)。如果找到這樣一個庫或創建一個庫,則可以使用它來解析來自ByteString的Akka流的數據。