本主題應僅包含每個X的最新'文檔X更新'事件。但我無法正確配置主題並保留多個副本。如何配置用作快照存儲的卡夫卡主題
我的想法是保持細分,以及所有相關超時,清空和保留時間。
主題設置(我沒有哪裏有什麼前綴的每個選項和一個足夠清晰的認識應用,所以有可能是一些未使用的和無關的內容以及誇張的數字 - 更正歡迎):
"cleanup.policy" -> "compact",
"file.delete.delay.ms" -> "10",
"segment.bytes" -> "10000",
"delete.retention.ms" -> "10",
"retention.bytes" -> "10000",
"segment.ms" -> "10",
"retention.ms" -> "10",
"min.cleanable.dirty.ratio" -> "0.001",
"flush.messages" -> "1",
"flush.ms" -> "10",
"min.compaction.lag.ms" -> "1",
"log.cleaner.min.compaction.lag.ms" -> "1"
我喂話題akka-streams-kafka:
val ids = List("12345", ...)
val publish: Future[Done] = Source(ids ++ ids ++ ids ++ ids ++ ids)
.map { id =>
ProducerMessage.Message(new ProducerRecord[String, String](topic, id, id), id)
}
.via(producerFlow)
.map(logResult)
.runWith(Sink.ignore)
Await.result(publish, 3.seconds)
等待幾秒鐘後,我算的消息:
var count = 0
val runCount = Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic))
.map { t =>
count += 1
t
}
.runWith(Sink.ignore)
Try { Await.result(runCount, timeout) }
我希望消費者能夠收到ids.length
消息,但它總是會在第一次運行時接收所有生成的消息,而在後續運行時會收到更多消息。
確實發生了一些壓縮 - 如果我多次運行測試,消耗的消息數停止增長,並且我看到kafka日誌中的段刪除 - 但每個密鑰仍有多個消息。
如何將卡夫卡主題用作快照存儲?
使用kafka 0.10.2.1
謝謝。
我需要一些細節來調查:1.請問您可以提供經紀人日誌,2.經紀人級別的配置將有所幫助。 –
@SudheshRajan當然,這裏是[重複測試執行過程中來自broker.log的行](https://gist.github.com/ksilin/095353de745ce8707d6150eae6796c18),這裏是[server.properties](https:// gist .github.com/ksilin/415964ec885d5e7c695986046c04c65b)。 'server.properties'是香草。更多我可以提供的信息? – kostja