2016-05-23 210 views
0

我已經建立了以下卡夫卡消費者之間運行組:卡夫卡0.90消費者堅持

Properties props = new Properties(); 
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:6667"); 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST1"); 
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); 
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); 
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); 
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); 
this.kconsumer = new KafkaConsumer(props); 

我想消費者來說,啓動時啓動與最早的這一組。所以我第一次運行它,它可以像預期的那樣完美地工作。只要訂閱存在且連接未關閉,它就會繼續增加偏移量。

當我登錄到卡夫卡和運行以下命令:偏移

./kafka-consumer-groups.sh --bootstrap-server localhost:6667 --new-consumer --group TEST1 --describe 

我看看到底是什麼預期,同比增長等等。當連接被關閉,但在運行「消費者相同的命令結果TEST1組不存在或正在重新平衡。「只是它不是重新平衡,它已經消失了。

當消費者沒有運行時,我該如何堅持組的存在?我錯過了消費者或卡夫卡的配置嗎?

作爲另一個說明,當我將OFFSET參數更改爲「latest」時,除非新記錄加載,即使記錄未過期,我也不會得到任何記錄。

因此,我想要做的就是創建一個新名稱的消費者,能夠從最早的可用記錄中提取,關閉該消費者,如果我以該名稱啓動消費者再次從我離開的地方拉開。任何想法我缺少什麼?或者我只是誤解了高層次消費者應該如何工作?

+0

我發現如果我將OFFSET更改爲最新我正在獲得所需的結果。但是,我如何檢查該組是否曾經存在?因爲如果我將OFFSET設置爲最新時間,那麼該組從未生成過,它將不返回任何記錄。因此,我認爲我需要一件事情,當它是新的和其他以前使用。 –

+0

你禁用自動提交 - 你手動提交?如果您沒有提交,卡夫卡無法知道您何時停止您的消費者。 –

+0

我做手動提交是的。因此,在第二次運行消費者時,將OFFSET改變爲最新的工作。 –

回答

1

萬一有人遇到這種情況,並想知道我做了什麼。在確定組是否存在之後,我可以設置偏移量。這樣做意味着如果該組存在使用「最新」。如果不是,請使用「最早的」。

private void buildConsumer(String offset) 
    { 
     Properties props = new Properties(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:6667"); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); 
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); 
     this.kconsumer = new KafkaConsumer(props); 
    } 

    /* 
    Check if the group exists before polling. 
    If it does, leave with default offset. 
    If it does not exists, set the offset to earliest to ensure you are getting all the records 
    */ 
    private void groupExists(String topic) 
    { 
     TopicPartition toc = new TopicPartition(topic, 0); 
     OffsetAndMetadata oam = kconsumer.committed(toc); 
     if(oam != null){ 
      //do nothing, all is well, start from last commit 
     } else { 
      /* 
      when a new group is started the AUTO_OFFSET_RESET_CONFIG 
      needs to be set to earliest to ensure all records are picked up 
      Since that property can only be set at instantiation the consumer 
      must be rebuilt and resubscribed 
      */ 
      buildConsumer("earliest"); 
      this.kconsumer.subscribe(Arrays.asList(topic)); 
     } 
    }