在0.11或更高版本中,您可以運行bin/kafka-delete-records.sh命令來標記要刪除的消息。
https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh
例如發佈100個消息
seq 100 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
然後刪除那些100個的消息90與新kafka-delete-records.sh 命令行工具
./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json
其中offsetfile.json包含
{"partitions": [{"topic": 「mytest", "partition": 0, "offset": 90}], "version":1 }
然後消耗從頭開始的消息以驗證消息中的90個確實標記爲已刪除。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning
91
92
93
94
95
96
97
98
99
100
您首先提到的保留時間技巧要好得多。第二種方式會導致複製主題出現問題,並導致主題的元數據與實際情況不一致。請注意,偏移號碼不會回到零。 – dawsaw