基本上這裏是我使用的代碼。akka stream閱讀時無盡http流反壓
當我用curl進行連接時,我發現curl命令中的所有實體都非常快。當我嘗試模仿與akka相同的行爲時,打印出我得到的元素之間會有很大的停頓。
流動波紋管在某種程度上受到了壓力,並且在前4條消息之後,1條消息的其餘部分會在明顯的打印線之後到達。
前4條消息大約是2k JSON,最後一條沒有。 5是80k JSON。
最後一個實體(編號5)也是最大的塊,我得到它在流完成之前打印的印象。而且我非常積極,只有2-3秒的運行時間。
知道爲什麼這流中讀取前4個元素
val awesomeHttpReq = Http().singleRequest(
HttpRequest(
method = GET,
uri = Uri("http://some-service-providing-endless-http.stream")
)
)
val a = Source.fromFuture(awesomeHttpReq).flatMapConcat {
case HttpResponse(status, _, entity, _) =>
// I saw some comments the back pressure might kick in
// because I might not be consuming the bytes here properly
// but this is totally in line with all the examples etc.
entity.withoutSizeLimit.getDataBytes.via(Framing delimiter (ByteString("\n"), Int.MaxValue))
} map { bytes =>
parse(bytes decodeString StandardCharsets.UTF_8).fold(pf => throw new IllegalStateException(s"unable to parse: $pf"), identity[Json])
} mapConcat { items =>
// every line that comes in from previous stage contains
// key elements - this I'm interested in, it's an array
items.asObject flatMap (_.toMap get "events") flatMap (_ asArray) getOrElse Nil
}
val b: Future[Vector[Json]] = a
.takeWithin(50 second)
.runWith(Sink.fold(Vector.empty[Json])((a, b) => {
// I'm using this to see what's going on in the stream
// there are significant pauses between the entities
// in reality the elements are available in the stream (all 5)
// within 2-3 seconds
// and this printing just has very big pause after first 4 elements
println(s"adding\n\n\n ${b.noSpaces}")
a :+ b
}))
Await.result(b, 1 minute)
後只是掛我看了一下這個問題似乎真的接近我有https://github.com/akka/akka-http/issues/57,但不知何故未能找到我的情況下,一些有幫助的。
我也試過改變一下akka http的塊大小,並沒有真正的幫助。
這裏有傳入消息的時機: 從流初始化:
1. 881 ms
2. 889 ms
3. 894 ms
4. 898 ms
// I don't understand why this wait time of 30 seconds in betweeen
5. 30871 ms
的最後一條消息顯然某處掛起30秒
任何想法真的可以理解。
更新:
因爲它是真正奇怪的是,前4種元素在4擺脫一貫暨第五屆一個正在等待了30秒,我決定從默認的4增加initial-input-buffer-size = 4
到16,現在它按預期工作。我只是無法理解上面代碼中背壓的起因。
更新2:
緩衝區大小有助於我的簡單示例。但在我的真正的問題我有一些很奇怪的事情:
entity.withoutSizeLimit.dataBytes
.alsoTo(Sink.foreach(a => println("stage 1 " + a.decodeString(StandardCharsets.UTF_8))))
.via(Framing delimiter (ByteString("\n"), Int.MaxValue))
.buffer(1000, OverflowStrategy.backpressure)
.alsoTo(Sink.foreach(a => println("stage 2 " + a.decodeString(StandardCharsets.UTF_8))))
我可以看到我的取景之前所需要的信息(階段1),但之後在日誌(第2階段)。我確信有足夠的空間可以通過放置緩衝區來實現。
現在我已經找到了新的行字符不會真正走進盈階段(第一階段),這是何等的每一行通常結束:
"7da".sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toChar).mkString
res12: String =
"}
"
在我的最後一個項目我m錯過了最後一個字節a
,基本上新行並沒有進入框架。所以整個事情不會發射。
有趣的是,我想知道你是否可以在不使用akka-http的情況下重現這種情況,即將一些源JSON轉儲到文件中,並使用'Source.fromFile'代替http請求。 –
當我剛從捲曲轉儲它的作品。此外,我現在嘗試'initial-input-buffer-size = 16',它按預期工作......這真的很奇怪,看起來背壓在某處。但無法弄清楚在哪裏。 –
用文件作爲流嘗試,使用與此處相同的代碼。我不會遇到這個問題:(現在讓我有點瘋狂:D –