2017-06-22 46 views
1

本主題應僅包含每個X的最新'文檔​​X更新'事件。但我無法正確配置主題並保留多個副本。如何配置用作快照存儲的卡夫卡主題

我的想法是保持細分,以及所有相關超時,清空和保留時間。

主題設置(我沒有哪裏有什麼前綴的每個選項和一個足夠清晰的認識應用,所以有可能是一些未使用的和無關的內容以及誇張的數字 - 更正歡迎):

"cleanup.policy"     -> "compact", 
"file.delete.delay.ms"    -> "10", 
"segment.bytes"      -> "10000", 
"delete.retention.ms"    -> "10", 
"retention.bytes"     -> "10000", 
"segment.ms"      -> "10", 
"retention.ms"      -> "10", 
"min.cleanable.dirty.ratio"   -> "0.001", 
"flush.messages"     -> "1", 
"flush.ms"       -> "10", 
"min.compaction.lag.ms"    -> "1", 
"log.cleaner.min.compaction.lag.ms" -> "1" 

我喂話題akka-streams-kafka

val ids = List("12345", ...) 

val publish: Future[Done] = Source(ids ++ ids ++ ids ++ ids ++ ids) 
    .map { id => 
    ProducerMessage.Message(new ProducerRecord[String, String](topic, id, id), id) 
    } 
    .via(producerFlow) 
    .map(logResult) 
    .runWith(Sink.ignore) 
Await.result(publish, 3.seconds) 

等待幾秒鐘後,我算的消息:

var count = 0 
val runCount = Consumer 
    .plainSource(consumerSettings, Subscriptions.topics(topic)) 
    .map { t => 
    count += 1 
    t 
    } 
    .runWith(Sink.ignore) 
Try { Await.result(runCount, timeout) } 

我希望消費者能夠收到ids.length消息,但它總是會在第一次運行時接收所有生成的消息,而在後續運行時會收到更多消息。

確實發生了一些壓縮 - 如果我多次運行測試,消耗的消息數停止增長,並且我看到kafka日誌中的段刪除 - 但每個密鑰仍有多個消息。

如何將卡夫卡主題用作快照存儲?

使用kafka 0.10.2.1

謝謝。

+0

我需要一些細節來調查:1.請問您可以提供經紀人日誌,2.經紀人級別的配置將有所幫助。 –

+0

@SudheshRajan當然,這裏是[重複測試執行過程中來自broker.log的行](https://gist.github.com/ksilin/095353de745ce8707d6150eae6796c18),這裏是[server.properties](https:// gist .github.com/ksilin/415964ec885d5e7c695986046c04c65b)。 'server.properties'是香草。更多我可以提供的信息? – kostja

回答

1

根據卡夫卡規格:「日誌壓實確保卡夫卡將始終保留至少用於單個主題分區的記錄數據中的每個消息密鑰的最後已知值」。即Kafka並不保證每個密鑰只保留一條消息,但它保證每個密鑰始終具有最新的消息版本。

+0

日誌壓縮似乎是要走的路 – PragmaticProgrammer

+0

這是每個主題分區的最新部分,它保持打開供寫入,並且沒有壓縮(直到它翻轉並且新的分段文件成爲寫入活動部分)。否則,所有其他段在壓縮週期結束後應該只有每個鍵的一條消息。 –

0

您可以嘗試解決配置問題以查看是否可以實現您想要的操作(請參閱this),但我建議在應用程序級別處理它,僅將最新消息與該密鑰一起用作有效消息,因爲日誌壓縮運行在一個單獨的線程上,每次更新後都無法觸發它(即使有辦法,效率也不會很高)。