2017-04-19 138 views
4

如果我有一個enable.auto.commit=false和我打電話consumer.poll()而不呼叫consumer.commitAsync()後,爲什麼consumer.poll()返回 下一次它的新記錄被稱爲?Consumer.poll()即使沒有提交補償也會返回新記錄?

由於我沒有提交我的抵消,我期望poll()將返回最新的抵消,應該是相同的記錄再次。

我在問,因爲我試圖在處理過程中處理故障場景。我希望沒有提交抵消,poll()會再次返回相同的記錄,所以我可以再次處理這些失敗的記錄。

public class MyConsumer implements Runnable { 
    @Override 
    public void run() { 
     while (true) { 
      ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE); 
      for (ConsumerRecord record : records) { 
       try { 
        //process record 
        consumer.commitAsync(); 
       } catch (Exception e) { 
       } 
       /** 
       If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception. 
       **/ 
      } 

     } 
    } 
} 

回答

3

輪詢的起始偏移量不是由代理人決定的,而是由消費者決定的。消費者追蹤最近收到的偏移量,並在下一輪投票期間詢問以下一組消息。

當消費者停止或失敗時偏移提交起作用,而另一個不知道最近消耗的偏移量的實例將消耗分區。

KafkaConsumer有相當廣泛的Javadoc值得一讀。

+1

有道理。但是[poll()doc](https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long))說:「最後消費的偏移量可以是通過seek(TopicPartition,long)手動設置或者自動設置爲訂閱的分區列表的最後提交偏移量「 後者是否執行我的問題所要求的 - 哪些將所消耗的偏移量設置爲最後提交的偏移量 - 如果我從未提交新的偏移量,它應該會導致poll()不返回新記錄。我的理解是否正確? – Glide

+1

我認爲文件的一部分只涉及消費的起點/抵消點。因此,您可以通過使用搜索或使用提交的偏移量在任何地方開始。 – ftr

+1

你的意思是它只會在第一次調用poll()時使用提交的偏移量? – Glide

相關問題