我只是跟着ActorPublisher例如阿卡流,有時我得到這個消息:阿卡流OnNext不允許
java.lang.IllegalStateException:當流 沒有要求的元素onNext是不允許的,totalDemand在0
看文檔,他們解釋:
通過調用onNext發送元素的流。您可以發送 發送與流客戶請求的元素一樣多的元素。 這個金額可以用totalDemand查詢。當isActive和totalDemand> 0時,只允許 使用onNext,否則onNext將 拋出IllegalStateException。
當流訂閱者請求更多元素時,ActorPublisherMessage.Request消息被傳遞給此actor,並且 您可以對該事件進行操作。總需求會自動更新。
如何防止totalDemand爲零?當我得到這個錯誤時,我失去了我想發送的信息。
這是我一直在關注的例子:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-integrations.html
,這是我的課測試
object Test extends App {
implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorFlowMaterializer()
val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
val publisher = kafka.consume("test", "groupName", new StringDecoder())
val workerActor = actorSystem.actorOf(Props[Worker], name = "workerActor")
Source(publisher).map(WorkerPool.Msg(_, workerActor)).runWith(Sink.actorSubscriber(WorkerPool.props))
}
嗯,我收到了來自卡夫卡的消息,我傳遞給然而,WorkerActor在向Kafka發送每秒10條消息時,其中一些由於此錯誤而丟失。
UPDATE
我正面臨着這裏所描述的錯誤(使用相同的庫):
https://github.com/softwaremill/reactive-kafka/issues/11
我解決了礦用緩衝區,但看起來是這樣的公關就能解決問題。
https://github.com/softwaremill/reactive-kafka/pull/13
隱VAL materializer = ActorFlowMaterializer( ActorFlowMaterializerSettings(actorSystem) .withInputBuffer(INITIALSIZE = 1024,MAXSIZE = 1024)) –
實現上述代碼,爲我工作。謝謝。 –
好聽。我記得有一些警告使用外部緩衝區,而不是將緩衝區保留在Actor本身內。密切關注...... –