0
我正在嘗試追蹤自定義流程中存在額外請求向上遊的錯誤。我寫了一個測試案例,鑑於我看到的行爲和日誌記錄應該失敗,但事實並非如此。爲什麼Akka Streams TestSource沒有收到每一次拉取請求?
我然後做一個平凡的流量明顯帶有額外拉動,但測試仍然沒有失敗。
下面是一個例子:
class EagerFlow extends GraphStage[FlowShape[String, String]] {
val in: Inlet[String] = Inlet("EagerFlow.in")
val out: Outlet[String] = Outlet("EagerFlow.out")
override def shape: FlowShape[String, String] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush() = {
println("EagerFlow.in.onPush")
println("EagerFlow.out.push")
push(out, grab(in))
println("EagerFlow.in.pull")
pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull() = {
println("EagerFlow.out.onPull")
println("EagerFlow.in.pull")
pull(in)
}
})
}
}
下面是應該通過測試,即確認是否有一個額外的請求。
"Eager Flow" should "pull on every push" in {
val sourceProbe = TestSource.probe[String]
val sinkProbe = TestSink.probe[String]
val eagerFlow = Flow.fromGraph(new EagerFlow)
val (source, sink) = sourceProbe.
via(eagerFlow).
toMat(sinkProbe)(Keep.both).
run
sink.request(1)
source.expectRequest()
source.expectNoMsg()
source.sendNext("hello")
sink.expectNext()
source.expectRequest()
}
在標準輸出,你可以看到:
0C3E08A8 PULL DownstreamBoundary -> [email protected] ([email protected]) [[email protected]]
EagerFlow.out.onPull
EagerFlow.in.pull
0C3E08A8 PULL [email protected] -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary]
EagerFlow.in.onPush
EagerFlow.out.push
EagerFlow.in.pull
0C3E08A8 PULL [email protected] -> UpstreamBoundary (BatchingActorInputBoundary(id=0, fill=0/16, completed=false, canceled=false)) [UpstreamBoundary]
assertion failed: timeout (3 seconds) during expectMsg:
java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg:
at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:368)
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:737)
at akka.stream.testkit.StreamTestKit$PublisherProbeSubscription.expectRequest(StreamTestKit.scala:665)
at akka.stream.testkit.TestPublisher$Probe.expectRequest(StreamTestKit.scala:172)
(這與調試日誌,實際使用的打印點,但實際上是相同的事情做好)
爲什麼這在UpstreamBoundary
拉停止,並永遠不會回到TestSource.probe
?
我有使用expectNoMsg()
以確保正確的背壓相當多的測試,但似乎這些只是給誤報。如果這不起作用,我應該如何測試?
是的,你是對的。我可以看到'BatchingActorInputBoundary'爲16個元素調用'tryRequest'。還有一點倒退,首先是如何創建一個異步邊界。我會認爲TestKit提供了一些讓個人測試更直觀的測試。我沒有意識到的另一件事是'Probe.expectRequest()'允許多個並返回數字。如果這個複數名稱會更明顯。 – Steiny