#!/bin/zsh
zk_servers=('10.138.0.8' '10.138.0.9' '10.138.0.16')
kafka_servers=('10.138.0.13:9092' '10.138.0.14:9092')
topics=('t1' 't2' 't1_failed' 't2_failed')
NORMAL=$(tput sgr0)
GREEN=$(tput setaf 2; tput bold)
YELLOW=$(tput setaf 3)
RED=$(tput setaf 1)
function red() {
echo -e "$RED$*$NORMAL"
}
function green() {
echo -e "$GREEN$*$NORMAL"
}
function yellow() {
echo -e "$YELLOW$*$NORMAL"
}
for topic in $topics; do
yellow "Cleaning up messages in topic @ " $topic
yellow "=============================================================="
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --config retention.ms=100
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
done
red "Waiting 120 seconds for messages to expire"
sleep 120
for topic in $topics; do
green "Restoring config of topic @ " $topic
green "=============================================================="
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --delete-config retention.ms
$KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic
$KAFKA/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $kafka_servers --topic $topic
done
當我運行此腳本時 - 我可以看到config.retention.ms已更改爲100ms,但延遲120秒後 - 我仍然在所有kafka主題中看到相同的消息。卡夫卡的retention.ms沒有被卡夫卡0.10.2強制執行?
那麼如何清除郵件?
感謝, 梅德
此處描述的行爲適用於0.10.2之前的版本。上次修改時間不再使用。而是使用時間索引。 – dawsaw
你碰巧有鏈接或KIP嗎?目前的文檔仍然說「數據一次被刪除一個日誌段,日誌管理器允許可插拔刪除策略選擇哪些文件符合刪除條件,但是當前策略會刪除任何修改時間超過N天前的日誌,儘管保留最後N GB的政策也可能有用。「 (https://kafka.apache.org/documentation/#impl_deletes) – ftr
這是一個0.10.1的顯着變化https://kafka.apache.org/documentation/#upgrade_10_1_breaking – dawsaw