0
我已經建立了以下卡夫卡消費者之間運行組:卡夫卡0.90消費者堅持
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:6667");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
this.kconsumer = new KafkaConsumer(props);
我想消費者來說,啓動時啓動與最早的這一組。所以我第一次運行它,它可以像預期的那樣完美地工作。只要訂閱存在且連接未關閉,它就會繼續增加偏移量。
當我登錄到卡夫卡和運行以下命令:偏移
./kafka-consumer-groups.sh --bootstrap-server localhost:6667 --new-consumer --group TEST1 --describe
我看看到底是什麼預期,同比增長等等。當連接被關閉,但在運行「消費者相同的命令結果TEST1
組不存在或正在重新平衡。「只是它不是重新平衡,它已經消失了。
當消費者沒有運行時,我該如何堅持組的存在?我錯過了消費者或卡夫卡的配置嗎?
作爲另一個說明,當我將OFFSET參數更改爲「latest」時,除非新記錄加載,即使記錄未過期,我也不會得到任何記錄。
因此,我想要做的就是創建一個新名稱的消費者,能夠從最早的可用記錄中提取,關閉該消費者,如果我以該名稱啓動消費者再次從我離開的地方拉開。任何想法我缺少什麼?或者我只是誤解了高層次消費者應該如何工作?
我發現如果我將OFFSET更改爲最新我正在獲得所需的結果。但是,我如何檢查該組是否曾經存在?因爲如果我將OFFSET設置爲最新時間,那麼該組從未生成過,它將不返回任何記錄。因此,我認爲我需要一件事情,當它是新的和其他以前使用。 –
你禁用自動提交 - 你手動提交?如果您沒有提交,卡夫卡無法知道您何時停止您的消費者。 –
我做手動提交是的。因此,在第二次運行消費者時,將OFFSET改變爲最新的工作。 –