2017-09-14 55 views
1

有沒有方法可以在不刪除卡夫卡主題的情況下刪除隊列消息?
我想在激活使用者時刪除隊列消息。如何在不刪除主題的情況下刪除/清除卡夫卡排隊郵件主題

我知道有幾種方法,如:

  1. 重置保留時間

    $ ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --config retention.ms=1000

  2. 刪除文件卡夫卡

    $ rm -rf /data/kafka-logs/<topic/Partition_name>

+0

您首先提到的保留時間技巧要好得多。第二種方式會導致複製主題出現問題,並導致主題的元數據與實際情況不一致。請注意,偏移號碼不會回到零。 – dawsaw

回答

2

在0.11或更高版本中,您可以運行bin/kafka-delete-records.sh命令來標記要刪除的消息。

https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh

例如發佈100個消息

seq 100 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest 

然後刪除那些100個的消息90與新kafka-delete-records.sh 命令行工具

./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json 

其中offsetfile.json包含

{"partitions": [{"topic": 「mytest", "partition": 0, "offset": 90}], "version":1 } 

然後消耗從頭開始的消息以驗證消息中的90個確實標記爲已刪除。

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning 
91 
92 
93 
94 
95 
96 
97 
98 
99 
100 
+0

感謝Hans的回覆!這與我一直想要的非常接近。 你是否知道我是否可以在不知道有多少消息排隊的情況下刪除所有偏移量? 我可以通過編輯json來做到這一點嗎? – Tachikoma

+0

是的,你可以刪除所有消息。你也可以跳過使用這個工具,並查看源代碼並編寫自己的程序,直接調用相同的API在任何給定的偏移量(包括最新的偏移量)之前刪除記錄,或者可以通過時間戳查找偏移量以刪除所有記錄在特定時間之前。這個工具的使用應在卡夫卡1.0可以更好地記錄的API,它是列於2017年10月 –

+0

我發現 {「分區」: [{ 「主題」:「測試」, 「分區」:0, 「offset」:-1 }], 「version」:1 } 可以清除主題中的所有消息。 我的下一個問題是,這會刪除不同組的相同主題。 例如,如果主題「mytest」在消費者組「group1」中,並且「group2」正在觀看主題「mytest」,則該工具將刪除兩個組中的所有消息。 有什麼辦法可以阻止它? – Tachikoma