2017-09-16 32 views
0

我有一個批量作業,每天觸發一次。要求是消費卡夫卡主題上的所有消息並斷開連接

  1. 消耗在卡夫卡主題可用的所有消息在該時間點
  2. 處理消息
  3. 如果該過程成功完成,提交的偏移量。

目前我輪詢()消息在while循環中,直到ConsumerRecords.isEmpty()爲true。當ConsumerRecords.isEmpty()爲true時,我假設當前在Topic上可用的所有記錄已被使用。應用程序維護偏移量並關閉卡夫卡消費者。

當消息處理完成併成功完成時,我創建一個新的KafkaConsumer並提交應用程序維護的偏移量。

注意我關閉最初用於讀取消息的KafkaConsumer,並使用另一個KafkaConsumer實例來提交偏移以避免消費者重新平衡異常。

我期待主題上最多5k條消息。該主題已分區並複製。

有什麼更好的方法來消費在特定時間點的所有消息主題?有什麼我失蹤或需要照顧?我不認爲我需要關注消費者重新平衡,因爲我輪詢()循環中的消息並在輪詢完成後處理消息。

我正在使用java kafka客戶端v0.9,如果在上述情況下有幫助,可以更改爲v0.10。

感謝

更新時間:

AtomicBoolean flag = new AtomicBoolean(); 
flag.set(true); 

while(flag.get()) { 

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(timeout); 

if(consumerRecords.isEmpty()) { 
    flag.set(false); 
    continue; 
} 
    //if the ConsumerRecords is not empty process the messages and continue to poll() 
} 
kafkaConsumer.close(); 
+0

你能否澄清結果是什麼樣子?具體而言,是將結果附加到每日結果列表(如每日報告,獨立於其他日期的報告)還是與之前的結果結合使用(如通過減去當前銷售的總項目來更新庫存水平)?或者是其他東西? – Svend

+0

5K日常輸入事件的性質又是什麼?他們每天如何分配?例如,如果由於某些問題,批處理在過去3天內未成功執行,則在下一次成功執行期間,是否應該將15K消息分離爲每天單獨處理,或者計算是否接受以接收整組非 - 尚未處理的消息? – Svend

+0

@Svend該應用程序生成每日報告。日常輸入事件來自第三方系統。如果批處理在過去3天內未成功執行,則不必拆分消息,但可以接收所有尚未處理的消息。 –

回答

0

你不能假設一個電話後輪詢()您已經閱讀所有在那一刻的主題中提供的信息,由於max.poll 。記錄用戶的配置參數。這是單個poll()返回的記錄的最大數量,它的默認值是500.這意味着如果在那一刻在主題中有600條消息,則需要在poll()上進行兩次調用以讀取所有消息(但同時考慮到其他一些消息可能會到達)。 我不明白的另一件事是爲什麼你使用不同的消費者提交補償。您正在討論的消費者重新平衡異常是什麼?

+0

我不做一個民意調查()一次,我做了一個循環,正如我在問題中所說的。我用代碼更新了問題,以便更清楚。 –

+0

使用另一個KafkaConsumer提交補償的原因是,一旦你消費了消息,我停止輪詢並開始處理消息,這需要花費時間,而我用來消費消息的Kafka消費者不會發出心跳,並且會如果我嘗試使用相同的KafkaConsumer來提交偏移量,則會發生重新平衡。 因此,一旦消息是消費者,我關閉KafkaConsumer,處理消息,然後創建一個新的KafkaConsumer,只是在消息處理成功時提交偏移量。 –

+0

從Kafka 0.10.1.0開始,心跳被更改,現在它在後臺線程中執行,所以您可以停止輪詢很長時間,而不需要代理考慮客戶端死亡並離開消費者組。順便說一句,有一個新的超時名爲max.poll.interval.ms(默認5分鐘):你應該在5分鐘內重新調用輪詢。有關0.10.1.0發行說明的更多信息:https://kafka.apache.org/documentation/#upgrade_1010_notable – ppatierno