2015-06-18 68 views
1

我只是跟着ActorPublisher例如阿卡流,有時我得到這個消息:阿卡流OnNext不允許

java.lang.IllegalStateException:當流 沒有要求的元素onNext是不允許的,totalDemand在0

看文檔,他們解釋:

通過調用onNext發送元素的流。您可以發送 發送與流客戶請求的元素一樣多的元素。 這個金額可以用totalDemand查詢。當isActive和totalDemand> 0時,只允許 使用onNext,否則onNext將 拋出IllegalStateException。

當流訂閱者請求更多元素時,ActorPublisherMessage.Request消息被傳遞給此actor,並且 您可以對該事件進行操作。總需求會自動更新。

如何防止totalDemand爲零?當我得到這個錯誤時,我失去了我想發送的信息。

這是我一直在關注的例子:

http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html

,這是我的課測試

object Test extends App { 

    implicit val actorSystem = ActorSystem("ReactiveKafka") 
    implicit val materializer = ActorFlowMaterializer() 

    val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181") 
    val publisher = kafka.consume("test", "groupName", new StringDecoder()) 

    val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor") 

    Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props)) 

} 

嗯,我收到了來自卡夫卡的消息,我傳遞給然而,WorkerActor在向Kafka發送每秒10條消息時,其中一些由於此錯誤而丟失。

UPDATE

我正面臨着這裏所描述的錯誤(使用相同的庫):

https://github.com/softwaremill/reactive-kafka/issues/11

我解決了礦用緩衝區,但看起來是這樣的公關就能解決問題。

https://github.com/softwaremill/reactive-kafka/pull/13

回答

2

如果往下流水槽沒有任何需求那麼你唯一的選擇是

  1. 告訴數據源供給Worker,沒有需求,使得源可停止生產的消息直到有更多需求進入(反應式解決方案)。
  2. 緩衝這些消息,直到您從接收器獲得一些可能會填滿緩衝區的請求,然後放棄消息。
  3. 當需求爲0時(這似乎是您當前的實施),丟棄消息。

但是「back-pressure」的要點是防止onNext在沒有需求時被調用。

爲了實現buffering選項上面,你可以在你的演員內部還是外部的任何緩衝:

  • 內部緩衝器:看「ActorPublisher」例如,在documentation用於緩衝的一個例子,而助長的一個演員ActorPublisher。
  • 外部緩衝區:在流中使用緩衝實現器或Flow.buffer使用外部緩衝區。
+0

隱VAL materializer = ActorFlowMaterializer( ActorFlowMaterializerSettings(actorSystem) .withInputBuffer(INITIALSIZE = 1024,MAXSIZE = 1024)) –

+0

實現上述代碼,爲我工作。謝謝。 –

+0

好聽。我記得有一些警告使用外部緩衝區,而不是將緩衝區保留在Actor本身內。密切關注...... –