2017-10-21 85 views
1

基本上這裏是我使用的代碼。akka stream閱讀時無盡http流反壓

當我用curl進行連接時,我發現curl命令中的所有實體都非常快。當我嘗試模仿與akka相同的行爲時,打印出我得到的元素之間會有很大的停頓。

流動波紋管在某種程度上受到了壓力,並且在前4條消息之後,1條消息的其餘部分會在明顯的打印線之後到達。

前4條消息大約是2k JSON,最後一條沒有。 5是80k JSON。

最後一個實體(編號5)也是最大的塊,我得到它在流完成之前打印的印象。而且我非常積極,只有2-3秒的運行時間。

知道爲什麼這流中讀取前4個元素

val awesomeHttpReq = Http().singleRequest(
    HttpRequest(
    method = GET, 
    uri = Uri("http://some-service-providing-endless-http.stream") 
) 
) 

val a = Source.fromFuture(awesomeHttpReq).flatMapConcat { 
    case HttpResponse(status, _, entity, _) => 
    // I saw some comments the back pressure might kick in 
    // because I might not be consuming the bytes here properly 
    // but this is totally in line with all the examples etc. 

    entity.withoutSizeLimit.getDataBytes.via(Framing delimiter (ByteString("\n"), Int.MaxValue)) 
} map { bytes => 
    parse(bytes decodeString StandardCharsets.UTF_8).fold(pf => throw new IllegalStateException(s"unable to parse: $pf"), identity[Json]) 
} mapConcat { items => 
    // every line that comes in from previous stage contains 
    // key elements - this I'm interested in, it's an array 
    items.asObject flatMap (_.toMap get "events") flatMap (_ asArray) getOrElse Nil 
} 

val b: Future[Vector[Json]] = a 
    .takeWithin(50 second) 
    .runWith(Sink.fold(Vector.empty[Json])((a, b) => { 

    // I'm using this to see what's going on in the stream 
    // there are significant pauses between the entities 
    // in reality the elements are available in the stream (all 5) 
    // within 2-3 seconds 
    // and this printing just has very big pause after first 4 elements 

    println(s"adding\n\n\n ${b.noSpaces}") 
    a :+ b 
    })) 

Await.result(b, 1 minute) 

後只是掛我看了一下這個問題似乎真的接近我有https://github.com/akka/akka-http/issues/57,但不知何故未能找到我的情況下,一些有幫助的。

我也試過改變一下akka http的塊大小,並沒有真正的幫助。

這裏有傳入消息的時機: 從流初始化:

1. 881 ms 
2. 889 ms 
3. 894 ms 
4. 898 ms 
// I don't understand why this wait time of 30 seconds in betweeen 
5. 30871 ms 

的最後一條消息顯然某處掛起30秒

任何想法真的可以理解。

更新:

因爲它是真正奇怪的是,前4種元素在4擺脫一貫暨第五屆一個正在等待了30秒,我決定從默認的4增加initial-input-buffer-size = 4到16,現在它按預期工作。我只是無法理解上面代碼中背壓的起因。

更新2:

緩衝區大小有助於我的簡單示例。但在我的真正的問題我有一些很奇怪的事情:

entity.withoutSizeLimit.dataBytes 
    .alsoTo(Sink.foreach(a => println("stage 1 " + a.decodeString(StandardCharsets.UTF_8)))) 
    .via(Framing delimiter (ByteString("\n"), Int.MaxValue)) 
    .buffer(1000, OverflowStrategy.backpressure) 
    .alsoTo(Sink.foreach(a => println("stage 2 " + a.decodeString(StandardCharsets.UTF_8)))) 

我可以看到我的取景之前所需要的信息(階段1),但之後在日誌(第2階段)。我確信有足夠的空間可以通過放置緩衝區來實現。

現在我已經找到了新的行字符不會真正走進盈階段(第一階段),這是何等的每一行通常結束:

"7da".sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toChar).mkString 
res12: String = 
"} 
" 

在我的最後一個項目我m錯過了最後一個字節a,基本上新行並沒有進入框架。所以整個事情不會發射。

+0

有趣的是,我想知道你是否可以在不使用akka-http的情況下重現這種情況,即將一些源JSON轉儲到文件中,並使用'Source.fromFile'代替http請求。 –

+0

當我剛從捲曲轉儲它的作品。此外,我現在嘗試'initial-input-buffer-size = 16',它按預期工作......這真的很奇怪,看起來背壓在某處。但無法弄清楚在哪裏。 –

+0

用文件作爲流嘗試,使用與此處相同的代碼。我不會遇到這個問題:(現在讓我有點瘋狂:D –

回答

1

經過相當多的調查後,我決定解決這個問題,因爲它看起來像存在多種因素的組合。整個問題的輸入源實際上是我公司使用的背景中帶有kafka的專有企業服務總線:https://github.com/zalando/nakadi

通過上述症狀,我想也許是系統沒有根據文檔,他們可能無法通過追加來發送\n,但他們也坦然以每行玩,但是這僅僅是不是因爲我查的情況下在代碼:https://github.com/zalando/nakadi/blob/0859645b032d19f7baa919877f72cb076f1da867/src/main/java/org/zalando/nakadi/service/EventStreamWriterString.java#L36

看到這個之後,我嘗試用這個例子來模擬整個事情:

build.sbt

name := "test-framing" 

version := "0.1" 

scalaVersion := "2.12.4"  

lazy val akkaVersion = "2.5.6" 
lazy val akkaHttpVersion = "10.0.10" 

libraryDependencies ++= Seq(
    "com.typesafe.akka" %% "akka-stream" % akkaVersion, 
    "com.typesafe.akka" %% "akka-http" % akkaHttpVersion, 
    "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion 
) 

scalacOptions in Compile ++= (scalacOptions in Compile).value :+ "-Yrangepos" 

* TestApp.scala - 在那裏我有這個問題在我的代碼*

import java.nio.charset.StandardCharsets 

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model._ 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Framing, Sink, Source} 
import akka.util.ByteString 

import scala.concurrent.duration._ 
import scala.concurrent.{Await, Future} 

object TestApp extends App { 

    implicit val system = ActorSystem("MyAkkaSystem") 
    implicit val materializer = ActorMaterializer() 

    val awesomeHttpReq = Http().singleRequest(
    HttpRequest(
     method = HttpMethods.GET, 
     uri = Uri("http://localhost:9000/streaming-json") 
    ) 
) 

    val a = Source.fromFuture(awesomeHttpReq).flatMapConcat { 
    case HttpResponse(status, _, entity, _) => 
     entity.withoutSizeLimit.getDataBytes 
     .via(Framing delimiter (ByteString("\n"), Int.MaxValue)) 
    } map { bytes => 
    bytes decodeString StandardCharsets.UTF_8 
    } 

    val b: Future[Vector[String]] = a 
    .takeWithin(50 second) 
    .runWith(Sink.fold(Vector.empty[String])((a, b) => { 
     println(s"adding $b") 
     a :+ b 
    })) 

    Await.result(b, 1 minute) 

} 

*模擬終點*

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.common.EntityStreamingSupport 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
import akka.http.scaladsl.server.Directives 
import akka.stream.scaladsl.{Flow, Source} 
import akka.stream.{ActorMaterializer, ThrottleMode} 
import akka.util.ByteString 
import spray.json._ 

import scala.concurrent.duration._ 
import scala.io.StdIn 

object TestApp2 extends App { 

    implicit val system = ActorSystem("MyAkkaSystem") 
    implicit val materializer = ActorMaterializer() 

    implicit val executionContext = system.dispatcher 

    case class SomeData(name: String) 

    trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { 
    implicit val someFormat = jsonFormat1(SomeData) 
    } 

    val start = ByteString.empty 
    val sep = ByteString("\n") 
    val end = ByteString.empty 

    implicit val jsonStreamingSupport = EntityStreamingSupport 
    .json() 
    .withFramingRenderer(Flow[ByteString].intersperse(sep)) 

    object MyJsonService extends Directives with JsonSupport { 

    def streamingJsonRoute = 
     path("streaming-json") { 
     get { 
      val sourceOfNumbers = Source(1 to 1000000) 

      val sourceOfDetailedMessages = 
      sourceOfNumbers 
       .map(num => SomeData(s"Hello $num")) 
       .throttle(elements = 5, 
         per = 30 second, 
         maximumBurst = 6, 
         mode = ThrottleMode.Shaping) 

      complete(sourceOfDetailedMessages) 
     } 
     } 
    } 

    val bindingFuture = 
    Http().bindAndHandle(MyJsonService.streamingJsonRoute, "localhost", 9000) 

    println(s"Server online at http://localhost:9000/\nPress RETURN to stop...") 
    StdIn.readLine() // let it run until user presses return 
    bindingFuture 
    .flatMap(_.unbind()) // trigger unbinding from the port 
    .onComplete(_ => system.terminate()) // and shutdown when done 

} 

在模擬端點我的行爲如預期,所以沒有任何關於阿卡的錯誤。

當多個圖書館+ nakadi彙集在一起​​,但這只是鵝狩獵仍然可能會有一些問題。最後,如果我將batch_flush_timeout降低到某個較低值,服務器實際上會將下一個事件發送到管道中,因此管道中最後一個消息將被推送到我的應用程序層,以便我可以處理它。

基本上所有這些文字都是因爲一個單獨的字節在某種程度上不會成幀,但是在過去的幾天我又學到了很多關於akka流的知識。