我可以尋求一個特定的抵消。有沒有辦法阻止消費者在特定的偏移量?換句話說,消費,直到我給定的抵消。據我所知,卡夫卡不提供這樣的功能。如果我錯了,請糾正我。有沒有辦法阻止特定偏移量的Kafka消費者?
例如,分區有偏移1-10。我只想從3-8消耗。消耗第8條消息後,程序應該退出。
我可以尋求一個特定的抵消。有沒有辦法阻止消費者在特定的偏移量?換句話說,消費,直到我給定的抵消。據我所知,卡夫卡不提供這樣的功能。如果我錯了,請糾正我。有沒有辦法阻止特定偏移量的Kafka消費者?
例如,分區有偏移1-10。我只想從3-8消耗。消耗第8條消息後,程序應該退出。
是的,kafka不提供此功能,但您可以在消費者代碼中實現此功能。您可以嘗試使用commitSync()
來控制此操作。
公共無效commitSync(地圖偏移)
提交指定的主題和分區的列表中指定的偏移量。 這將抵消卡夫卡。使用此API提交的偏移量將在每次重新平衡之後的第一次獲取時以及啓動時使用。因此,如果您需要將偏移量存儲在Kafka以外的其他位置,則不應使用此API。提交的偏移量應該是應用程序將消耗的下一條消息,即lastProcessedMessageOffset + 1.
這是一個同步提交,並會阻塞,直到提交成功或遇到不可恢復的錯誤(在這種情況下,它將被拋出呼叫者)。
事情是這樣的:
while (goAhead) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
if (record.offset() > OFFSET_BOUND) {
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())));
goAhead = false;
break;
}
process(record);
}
}
你應該設置 「enable.auto.commit」 假在上面的代碼。在你的情況下,OFFSET_BOUND可以設置爲8.因爲在你的例子中提交的偏移量僅爲9,所以下次消費者將從這個位置獲取。
假設分區偏移是連續的(即不是日誌壓縮),您可以配置您的使用者(使用max.poll.records
config),以便它在每個輪詢中讀取一定數量的記錄。這可以讓你停止在你想要的偏移量。