4
如果我有一個enable.auto.commit=false
和我打電話consumer.poll()
而不呼叫consumer.commitAsync()
後,爲什麼consumer.poll()
返回 下一次它的新記錄被稱爲?Consumer.poll()即使沒有提交補償也會返回新記錄?
由於我沒有提交我的抵消,我期望poll()
將返回最新的抵消,應該是相同的記錄再次。
我在問,因爲我試圖在處理過程中處理故障場景。我希望沒有提交抵消,poll()
會再次返回相同的記錄,所以我可以再次處理這些失敗的記錄。
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
有道理。但是[poll()doc](https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long))說:「最後消費的偏移量可以是通過seek(TopicPartition,long)手動設置或者自動設置爲訂閱的分區列表的最後提交偏移量「 後者是否執行我的問題所要求的 - 哪些將所消耗的偏移量設置爲最後提交的偏移量 - 如果我從未提交新的偏移量,它應該會導致poll()不返回新記錄。我的理解是否正確? – Glide
我認爲文件的一部分只涉及消費的起點/抵消點。因此,您可以通過使用搜索或使用提交的偏移量在任何地方開始。 – ftr
你的意思是它只會在第一次調用poll()時使用提交的偏移量? – Glide