2016-02-11 134 views
1

我正在嘗試使用Java API監控給定組的消費者偏移量。我創建了一個不訂閱任何主題的額外消費者,但只需撥打consumer.committed(topic)即可獲取抵消信息。這種作品,但:卡夫卡0.9新消費者api ---如何看消費者偏移

爲了測試我只使用一個真正的消費者(即訂閱該主題的消費者)。當我使用close()將其關閉並稍後重新啓動時,儘管我使用poll(1000),但在訂閱和第一次使用郵件之間需要27秒。

我猜這與重新平衡有可能被非訂閱消費者混淆。這可能嗎?有沒有更好的方法來監控Java API的偏移量(我知道命令行工具,但需要使用API​​)。

回答

1

有不同的方法來檢查從主題偏移,取決於目的你想要的它,除了的「承諾」,你如上所述,這裏有兩個更多的選擇:如果你想

1)要知道偏移ID從消費者開始來從經紀人下一次線程(S)啓動(S)的數據,則必須使用「位置」作爲

long offsetPosition; 
TopicPartition tPartition = new TopicPartition(topic,partitionToReview); 
    offsetPosition = kafkaConsumer.position(tPartition); 
    System.out.println("offset of the next record to fetch is : " + position); 

2)調用「偏移()」方法來自ConsumerRecord對象,在執行了來自kafkaConsumer的民意調查之後

Iterator<ConsumerRecord<byte[],byte[]>> it = kafkaConsumer.poll(1000).iterator(); 
while(it.hasNext()){ 
ConsumerRecord<byte[],byte[]> record = it.next(); 
System.out.println("offset : " + record.offset()); 
} 
+0

這兩種方法僅適用於訂閱的消費者。重點是我有一個消費者只是監控。它不會參與消費,所以它不能使用這些方法。 – Harald

1

發現它:監控消費者增加了困惑,但不是罪魁禍首。最後很容易理解,但有點意外(至少對我來說):

session.timeout.ms的默認值是30秒。當消費者消失時,需要長達30秒的時間纔會被宣佈死亡,並且工作會重新平衡。爲了測試,我停止了我的單一消費者,等待了三秒鐘,然後重新啓動了一個新消費者。然後在它開始前27秒,填滿30秒超時。

我原本以爲一個單獨的消費者啓動不會等待超時過期,而是開始「重新平衡」,即立即抓住工作。看起來,即使只有一個消費者,工作重新平衡之前,超時也必須到期。

爲了讓測試更快地完成,我將配置更改爲對代理使用較低的session.timeout.ms,對於代理使用較低的session.timeout.ms以及group.min.session.timeout.ms

總結:使用未訂閱任何主題的消費者來監控偏移量工作得很好,似乎不會干擾再平衡過程。