2017-08-30 81 views
0

這裏是akka documentation水槽倍的阿卡流Source.actorRef緩衝和OverflowStrategy

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) 

val (ref, future) = Source.actorRef(3, OverflowStrategy.fail) 
    .toMat(sinkUnderTest)(Keep.both).run() 

ref ! 1 
ref ! 2 
ref ! 3 
ref ! akka.actor.Status.Success("done") 

val result = Await.result(future, 3.seconds) 
assert(result == "123") 

的代碼片段是一個工作的代碼片段,但是,如果我用裁判來告訴另一個消息像ref ! 4,我一個例外如akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)

我想緩衝區大小3應該足夠了。原因在於摺疊操作是(acc,ele)=> acc,因此需要累加器和元素返回新值累加器。

所以我改變了代碼讓另一個演員等待3秒。它正在重新工作。

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) 

    private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run() 

    ref ! 1 
    ref ! 2 
    ref ! 3 
    Thread.sleep(3000) 
    ref ! 4 
    ref ! akka.actor.Status.Success("done") 

    val result = Await.result(future, 10.seconds) 

    println(result) 

但是,我的問題是,有沒有一種方法可以告訴阿卡流減慢或等待水槽可用。我也使用OverflowStrategy.backpressure,但它表示Backpressure overflowStrategy not supported

還有其他的選擇嗎?

回答

4

您應該考慮將Source.queue作爲一種將元素從外部注入流中的方式,並以背壓感知的方式注入。

Source.queue將實現爲您可以提供元素的隊列對象,但是當您提供它們時,您將返回Future,當流準備好接受消息時,該對象將完成。下面

實施例:在docs

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right) 

    val (queue, future): (SourceQueueWithComplete[Int], Future[String]) = 
    Source.queue(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run() 

    Future.sequence(Seq(
    queue.offer(1), 
    queue.offer(2), 
    queue.offer(3), 
    queue.offer(4) 
)) 

    queue.complete() 

    val result = Await.result(future, 10.seconds) 

    println(result) 

更多信息。

+0

感謝您的回覆。你也可以提供代碼片段嗎?我對來自'Source.queue'的'Enqueued'和'offer'有點困惑。 –

+0

增加了一個例子 –