6

我面臨着一些嚴重的問題,試圖針對我的需求實施解決方案,關於KafkaConsumer(> = 0.9)。Kafka Consumer - 調查行爲

讓我們來想象一下,我有一個函數,它必須只讀取來自kafka主題的消息n

例如:getMsgs(5) - >獲取接下來的5個kafka消息。

所以,我有一個循環,如下所示:

for (boolean exit= false;!exit;) 
{ 
    Records = consumer.poll(200); 
    for (Record r:records) { 
     processRecord(r); //do my things 
     numMss++; 
     if (numMss==maximum) //maximum=5 
      exit=true; 
    } 
} 

考慮到這一點,問題是,poll()方法可以得到超過5級的消息。例如,如果它得到10條消息,我的代碼將永遠忘記其他5條消息,因爲卡夫卡會認爲它們已經被消耗。

我試圖commiting偏移,但似乎並沒有工作:

consumer.commitSync(Collections.singletonMap(partition, 
    new OffsetAndMetadata(record.offset() + 1))); 

即使有偏移結構,每當我再次啓動消費,它不會從6消息啓動(請記住,我只想要5條消息),但是從第11條(因爲第一次輪詢消耗了10條消息)。

有沒有解決方案,或者(可以肯定)我錯過了什麼?

在此先感謝!

回答

3

可以設置max.poll.records到無論你喜歡什麼樣的數字,至多你會在每次民意測驗中得到那麼多記錄。

對於您在此問題中陳述的用例,您不必自己明確提交補償。您可以將enable.auto.commit設置爲true,並將auto.offset.reset設置爲earliest,以便在沒有消費者時(即您第一次從分區開始讀取數據時)啓動它。一旦你有一個組。id和卡夫卡中存儲的一些消費者偏移量,並且如果您的卡夫卡消費者進程死亡,它將繼續從最後一次提交的偏移量開始,因爲它是默認行爲,因爲當消費者啓動時它將首先查找是否存在任何已確認的偏移量,將繼續從最近提交的偏移量和auto.offset.reset不會踢入。

0

將auto.offset.reset屬性設置爲「latest」。然後嘗試消耗,您將從提交的偏移量中獲取消耗的記錄。

或者您在poll之前使用consumer.seek(TopicPartition,offset)api。

+0

auto.offset.reset應該是最早的,它只有在沒有消費者group.id時纔開始使用。沒有組ID的人不能存儲偏移量。如果已經有一個消費者組ID,auto.offset.reset不會執行任何操作,默認情況下,消費者會從最後提交的偏移量中選擇。 – user1870400

0

如果您通過將enable.auto.commit設置爲false來禁用了自動提交功能。如果您想手動提交偏移量,則需要禁用該功能。如果沒有下一次調用poll(),將自動提交從前一輪詢()中收到的消息的最新偏移量。

0

從Kafka 0.9開始,auto.offset.reset參數名稱已更改;

怎麼辦時,沒有初始卡夫卡或者如果電流偏移不存在任何更多的服務器(例如因爲該數據已被刪除)偏移:

earliest: automatically reset the offset to the earliest offset 

latest: automatically reset the offset to the latest offset 

none: throw exception to the consumer if no previous offset is found for the consumer's group 

anything else: throw exception to the consumer. 
相關問題