我面臨着一些嚴重的問題,試圖針對我的需求實施解決方案,關於KafkaConsumer(> = 0.9)。Kafka Consumer - 調查行爲
讓我們來想象一下,我有一個函數,它必須只讀取來自kafka主題的消息n。
例如:getMsgs(5)
- >獲取接下來的5個kafka消息。
所以,我有一個循環,如下所示:
for (boolean exit= false;!exit;)
{
Records = consumer.poll(200);
for (Record r:records) {
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
exit=true;
}
}
考慮到這一點,問題是,poll()方法可以得到超過5級的消息。例如,如果它得到10條消息,我的代碼將永遠忘記其他5條消息,因爲卡夫卡會認爲它們已經被消耗。
我試圖commiting偏移,但似乎並沒有工作:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
即使有偏移結構,每當我再次啓動消費,它不會從6消息啓動(請記住,我只想要5條消息),但是從第11條(因爲第一次輪詢消耗了10條消息)。
有沒有解決方案,或者(可以肯定)我錯過了什麼?
在此先感謝!
auto.offset.reset應該是最早的,它只有在沒有消費者group.id時纔開始使用。沒有組ID的人不能存儲偏移量。如果已經有一個消費者組ID,auto.offset.reset不會執行任何操作,默認情況下,消費者會從最後提交的偏移量中選擇。 – user1870400