2017-04-17 234 views
2
#!/bin/zsh 
zk_servers=('10.138.0.8' '10.138.0.9' '10.138.0.16') 
kafka_servers=('10.138.0.13:9092' '10.138.0.14:9092') 
topics=('t1' 't2' 't1_failed' 't2_failed') 


NORMAL=$(tput sgr0) 
GREEN=$(tput setaf 2; tput bold) 
YELLOW=$(tput setaf 3) 
RED=$(tput setaf 1) 

function red() { 
    echo -e "$RED$*$NORMAL" 
} 

function green() { 
    echo -e "$GREEN$*$NORMAL" 
} 

function yellow() { 
    echo -e "$YELLOW$*$NORMAL" 
} 



for topic in $topics; do 
    yellow "Cleaning up messages in topic @ " $topic 
    yellow "==============================================================" 
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --config retention.ms=100 
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic 
done 

red "Waiting 120 seconds for messages to expire" 
sleep 120 

for topic in $topics; do 
    green "Restoring config of topic @ " $topic                 
    green "==============================================================" 
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --alter --topic $topic --delete-config retention.ms     
    $KAFKA/kafka-topics.sh --zookeeper $zk_servers --describe --topic $topic 
    $KAFKA/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $kafka_servers --topic $topic 
done 

當我運行此腳本時 - 我可以看到config.retention.ms已更改爲100ms,但延遲120秒後 - 我仍然在所有kafka主題中看到相同的消息。卡夫卡的retention.ms沒有被卡夫卡0.10.2強制執行?

那麼如何清除郵件?

感謝, 梅德

回答

3

你必須等待log.retention.check.interval.ms默認爲5分鐘。

5

它比接受的答案還多一點。卡夫卡將消息存儲在文件系統上的日誌文件中。這些文件具有翻轉(由時間或大小配置)。一旦文件不再是當前文件,Kafka將不再追加到該文件。

現在的樂趣部分:卡夫卡不會過期的個人消息。一旦該文件中消息的最高時間戳比retention.ms舊,它將(對於非緊湊主題)刪除整個日誌文件。保留時間告訴你至少至少可用的消息,但它可能可用的時間更長(取決於翻轉配置和消息量)。

在較早的Kafka版本中,這不是基於消息時間戳,而是基於對日誌文件的寫入訪問。感謝@dawsaw指出這一點。

+0

此處描述的行爲適用於0.10.2之前的版本。上次修改時間不再使用。而是使用時間索引。 – dawsaw

+0

你碰巧有鏈接或KIP嗎?目前的文檔仍然說「數據一次被刪除一個日誌段,日誌管理器允許可插拔刪除策略選擇哪些文件符合刪除條件,但是當前策略會刪除任何修改時間超過N天前的日誌,儘管保留最後N GB的政策也可能有用。「 (https://kafka.apache.org/documentation/#impl_deletes) – ftr

+0

這是一個0.10.1的顯着變化https://kafka.apache.org/documentation/#upgrade_10_1_breaking – dawsaw