這裏是akka documentation水槽倍的阿卡流Source.actorRef緩衝和OverflowStrategy
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (ref, future) = Source.actorRef(3, OverflowStrategy.fail)
.toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 3.seconds)
assert(result == "123")
的代碼片段是一個工作的代碼片段,但是,如果我用裁判來告訴另一個消息像ref ! 4
,我一個例外如akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)
我想緩衝區大小3應該足夠了。原因在於摺疊操作是(acc,ele)=> acc,因此需要累加器和元素返回新值累加器。
所以我改變了代碼讓另一個演員等待3秒。它正在重新工作。
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
Thread.sleep(3000)
ref ! 4
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 10.seconds)
println(result)
但是,我的問題是,有沒有一種方法可以告訴阿卡流減慢或等待水槽可用。我也使用OverflowStrategy.backpressure
,但它表示Backpressure overflowStrategy not supported
。
還有其他的選擇嗎?
感謝您的回覆。你也可以提供代碼片段嗎?我對來自'Source.queue'的'Enqueued'和'offer'有點困惑。 –
增加了一個例子 –