我有以下流:阿卡流TCP +阿卡流卡夫卡生產者未停止不發佈消息,而不是錯誤-ING出
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
它正常工作了一段時間,我可以消耗填充上的消息卡夫卡話題。但有時候,顯然是隨機的間隔,沒有更多的消息發佈,並且這段代碼沒有記錄任何錯誤(printAndByeBye將打印傳遞的消息並終止參與者系統。)重新啓動應用程序後,消息繼續流。
關於如何知道這裏發生了什麼的任何想法?
編輯:我把卡蒙它,我可以看到以下行爲:
它看起來像是停不通知流應該停止,但我不知道如何明確並停止流。
流不失敗的。沒有錯誤記錄。 – Darien