2015-04-22 67 views

回答

12

你可以在zookeeper shell的幫助下做到這一點。卡夫卡使用動物園管理員來追蹤消費者偏移量。

轉到卡夫卡bin目錄,並調用飼養員殼。(我的卡夫卡的版本是0.8.0)

./zookeeper-shell.sh localhost:2181 

現在使用的動物園管理員get命令

get /consumers/consumer_group_id/offsets/topic/0 

它表明類似

2043 
cZxid = 0x4d 
ctime = Wed Mar 18 03:56:32 EDT 2015 
... 

這裏2043是消耗的最大偏移量。通過使用動物園管理員將其設置爲所希望的值設定命令

set /consumers/consumer_group_id/offsets/topic/0 10000 

路徑尚未這樣/消費者/ [consumer_group_id] /偏移/ [主題]/[partition_id時]。
您將不得不用適當的消費者組,主題和分區ID來替換。

*此外,既然您提到這是一個新的卡夫卡實例,我不確定消費者是否會建立連接並創建消費羣。

+0

好的答案,這對我的項目真的有幫助! –

1

由於kafka 0.9偏移被存儲在一個主題中。要改變偏移,使用seek() method

public void seek(TopicPartition partition, long offset) 

覆蓋的獲取,消費者將在下一個poll(timeout)使用偏移。如果多次爲同一分區調用此API,則將在下一個poll()中使用最新的偏移量。請注意,如果此API在消費過程中被任意使用,您可能會丟失數據,以重置提取偏移量