2016-10-12 69 views
2

是否有可能從指定的偏移量啓動0.9或0.10卡夫卡消費者,同時仍然使用具有動態重新平衡功能的消費羣體?如何在使用動態組重新平衡時指定卡夫卡消費者的起始位置?

這裏是迄今發現:

案例1:如果我們使用consumer.assign(...)方法來手動指定分區的消費者 - 我們能做到以下操作:

consumer.seek(<specificPartition>, <myCustomOffset>); or: 
consumer.seekToBeginning(<specificPartition>); 
consumer.seekToEnd(<specificPartition>); 

基本上,我們可以完全控制啓動消費者表單的哪個位置,但是它的代價是沒有由卡夫卡動態地完成分區重新分配。情況2:如果我們使用consumer.subscribe(...)方法 - 卡夫卡將管理重新平衡,但是,我們不能做任何的以上三個選項... :( 因此,我們嘗試了以下爲「黑客」周圍 - 在消費者的啓動時間,前進入投票()循環:

// get coordinator from the private field of the consumer: 
ConsumerCoordinator coordinator = (ConsumerCoordinator) FieldUtils.readField(consumer, "coordinator", true); 
// make sure all partitions are already 
coordinator.ensurePartitionAssignment(); 
// get the list of partitions assigned to this specific consumer: 
Set<TopicPartition> assignedTopicPartitions = consumer.assignment() 
// now we can go ahead and do the same three actions (seek(), sequined() or seekToBeginning()) on those partitions only for this consumer as above. 
for (TopicPartition assignedPartition: assignedTopicPartitions) { 
    consumer.seek(<assignedPartition>, <myCustomOffset>) // or whatever 
... 
} 
// now start the poll() loop: 
while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(pollIntervalMs); 
    for (ConsumerRecord<String, String> record : records) { 
     // processMessage(record.value(), record.offset()); 
    } 
} 

這對我的品味來說太過黑客,而且,我也不確定這種邏輯是否會在實際的重新平衡過程中持續下去,比如新的消費者被加入到羣組中。

有人可以驗證這種方法或建議一個更好的方法來完成我們需要的東西嗎?

謝謝!

回答

2

而不是使用ConsumerCoordinator你可以做一個初始的poll()(並且不處理任何東西)來分配分區。之後,使用seek()並開始您的輪詢循環,如代碼中所示。

+0

感謝您的想法!然而,AFAIK,poll()將檢索事件,而不管我是否實際遍歷它們 - 並且Kafka將在某個點保存偏移量....所以,除非我還添加顯式偏移量提交併禁用Kafka的自動提交 - 例如,如果我做seekToEnd(),我可能會從第一次poll()中釋放事件。這是你的建議 - 添加手動偏移提交?謝謝! – Marina

+0

如果你想有更多的控制權,手動提交可以工作。但是,我不確定爲什麼會「鬆散」數據。只要保留時間足夠大,您總是可以回頭 - 如果您想seekToEnd(),則無論如何都要跳過數據。或者我錯過了什麼?請記住,承諾的抵消額不一定增加。例如,如果處理數據並提交偏移量10,則回溯到偏移量5,然後您仍然可以提交5。提交的偏移量僅確定在默認情況下關閉/失敗時要繼續進行的操作(即,無需手動查找) –

+0

您是正確的 - 好,我沒有闡明我的用例:with seekToEnd()或seekToBeginning( )或seek() - 沒有處理來自第一輪詢事件()的問題。我想到的使用案例是,對於* some *分區,我希望從他們完成的任何偏移量開始執行RESTART,而對於某些情況 - 執行自定義偏移量重新啓動。我認爲在這種情況下,不要忽略處理使用正常RESTART選項的分區/使用者中的任何消息的唯一方法是明確提交偏移量。再次感謝您的快速回答! – Marina