我有一個批量作業,每天觸發一次。要求是消費卡夫卡主題上的所有消息並斷開連接
- 消耗在卡夫卡主題可用的所有消息在該時間點
- 處理消息
- 如果該過程成功完成,提交的偏移量。
目前我輪詢()消息在while循環中,直到ConsumerRecords.isEmpty()爲true。當ConsumerRecords.isEmpty()爲true時,我假設當前在Topic上可用的所有記錄已被使用。應用程序維護偏移量並關閉卡夫卡消費者。
當消息處理完成併成功完成時,我創建一個新的KafkaConsumer並提交應用程序維護的偏移量。
注意我關閉最初用於讀取消息的KafkaConsumer,並使用另一個KafkaConsumer實例來提交偏移以避免消費者重新平衡異常。
我期待主題上最多5k條消息。該主題已分區並複製。
有什麼更好的方法來消費在特定時間點的所有消息主題?有什麼我失蹤或需要照顧?我不認爲我需要關注消費者重新平衡,因爲我輪詢()循環中的消息並在輪詢完成後處理消息。
我正在使用java kafka客戶端v0.9,如果在上述情況下有幫助,可以更改爲v0.10。
感謝
更新時間:
AtomicBoolean flag = new AtomicBoolean();
flag.set(true);
while(flag.get()) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(timeout);
if(consumerRecords.isEmpty()) {
flag.set(false);
continue;
}
//if the ConsumerRecords is not empty process the messages and continue to poll()
}
kafkaConsumer.close();
你能否澄清結果是什麼樣子?具體而言,是將結果附加到每日結果列表(如每日報告,獨立於其他日期的報告)還是與之前的結果結合使用(如通過減去當前銷售的總項目來更新庫存水平)?或者是其他東西? – Svend
5K日常輸入事件的性質又是什麼?他們每天如何分配?例如,如果由於某些問題,批處理在過去3天內未成功執行,則在下一次成功執行期間,是否應該將15K消息分離爲每天單獨處理,或者計算是否接受以接收整組非 - 尚未處理的消息? – Svend
@Svend該應用程序生成每日報告。日常輸入事件來自第三方系統。如果批處理在過去3天內未成功執行,則不必拆分消息,但可以接收所有尚未處理的消息。 –