2016-12-08 50 views
0

我正在嘗試追蹤自定義流程中存在額外請求向上遊的錯誤。我寫了一個測試案例,鑑於我看到的行爲和日誌記錄應該失敗,但事實並非如此。爲什麼Akka Streams TestSource沒有收到每一次拉取請求?

我然後做一個平凡的流量明顯帶有額外拉動,但測試仍然沒有失敗。

下面是一個例子:

class EagerFlow extends GraphStage[FlowShape[String, String]] { 

    val in: Inlet[String] = Inlet("EagerFlow.in") 
    val out: Outlet[String] = Outlet("EagerFlow.out") 

    override def shape: FlowShape[String, String] = FlowShape(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { 
    setHandler(in, new InHandler { 
     override def onPush() = { 
     println("EagerFlow.in.onPush") 
     println("EagerFlow.out.push") 
     push(out, grab(in)) 
     println("EagerFlow.in.pull") 
     pull(in) 
     } 
    }) 
    setHandler(out, new OutHandler { 
     override def onPull() = { 
     println("EagerFlow.out.onPull") 
     println("EagerFlow.in.pull") 
     pull(in) 
     } 
    }) 
    } 
} 

下面是應該通過測試,即確認是否有一個額外的請求。

"Eager Flow" should "pull on every push" in { 
    val sourceProbe = TestSource.probe[String] 
    val sinkProbe = TestSink.probe[String] 

    val eagerFlow = Flow.fromGraph(new EagerFlow) 

    val (source, sink) = sourceProbe. 
    via(eagerFlow). 
    toMat(sinkProbe)(Keep.both). 
    run 

    sink.request(1) 
    source.expectRequest() 
    source.expectNoMsg() 

    source.sendNext("hello") 
    sink.expectNext() 
    source.expectRequest() 
} 

在標準輸出,你可以看到:

0C3E08A8 PULL DownstreamBoundary -> [email protected] ([email protected]) [[email protected]] 
EagerFlow.out.onPull 
EagerFlow.in.pull 
0C3E08A8 PULL [email protected] -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary] 
EagerFlow.in.onPush 
EagerFlow.out.push 
EagerFlow.in.pull 
0C3E08A8 PULL [email protected] -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary] 

assertion failed: timeout (3 seconds) during expectMsg: 
    java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg: 
    at scala.Predef$.assert(Predef.scala:170) 
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:368) 
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:737) 
at akka.stream.testkit.StreamTestKit$PublisherProbeSubscription.expectRequest(StreamTestKit.scala:665) 
at akka.stream.testkit.TestPublisher$Probe.expectRequest(StreamTestKit.scala:172) 

(這與調試日誌,實際使用的打印點,但實際上是相同的事情做好)

爲什麼這在UpstreamBoundary拉停止,並永遠不會回到TestSource.probe

我有使用expectNoMsg()以確保正確的背壓相當多的測試,但似乎這些只是給誤報。如果這不起作用,我應該如何測試?

回答

1

爲了使您的測試應用,您需要設置您的流緩衝區的大小爲1

val settings = ActorMaterializerSettings(system) 
    .withInputBuffer(initialSize = 1, maxSize = 1) 
implicit val mat = ActorMaterializer(settings) 

的默認大小爲:

# Initial size of buffers used in stream elements 
    initial-input-buffer-size = 4 
    # Maximum size of buffers used in stream elements 
    max-input-buffer-size = 16 

這意味着,阿卡會急切地根據您的第一次請求,請求和攝取多達16個元素無論如何。有關內部緩衝區的更多信息,請參閱here

+0

是的,你是對的。我可以看到'BatchingActorInputBoundary'爲16個元素調用'tryRequest'。還有一點倒退,首先是如何創建一個異步邊界。我會認爲TestKit提供了一些讓個人測試更直觀的測試。我沒有意識到的另一件事是'Probe.expectRequest()'允許多個並返回數字。如果這個複數名稱會更明顯。 – Steiny

相關問題