2016-07-26 64 views
2

我們剛開始使用Kafka作爲我們的項目。我們正在使用kafka_2.11-0.9.0.0。我有一些與KafkaConsumer有關的疑問。卡夫卡消費者調查和重新連接

1)在啓動Zookeeper和Kafka服務器之前,我開始了Kafka Consumer,但我的KafkaConsumer客戶端仍然能夠連接。我有以下的代碼

Consumer<String, String> consumer = new KafkaConsumer<String,String>(props); 
    consumer.subscribe(getConsumerRegisteredTopics()); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
     for (ConsumerRecord<String, String> record : records){ 
      processRecord (record) 
     } 
    } 

2)I讀,動物園管理員通過使用輪詢(長超時)方法調用的保持活性的消費者軌跡線。如果我使用Long.MAX_VALUE在poll()中有超時,zookeeper如何跟蹤我的消費者。您能否幫我理解KafkaConsumer民意調查的行爲。

在此先感謝。

回答

1

1)如果您在啓動消費者之前沒有啓動zookeeper和kafka,它將無法連接,但會嘗試從kafka讀取元數據。我的經驗是,KafkaConsumer的「調查」調用會在未能連接並讀取元數據之前被阻塞。換句話說......你的消費者並沒有真正連接,但正在等待kafka集羣出現。

2)輪詢超時告訴消費者等待多久才能返回任何數據。您必須確保輪詢結束後您再次調用輪詢,以便消費者保持活動狀態。投票調用的超時時間與Kafka消費者的存活機制無關(這由消費者的消費者屬性屬性控制)。

+0

感謝您的回覆。如果我的客戶以輪詢方式等待Long.MAX_VALUE,它將如何發送心跳,以及Kafka服務器/ Zookeeper如何知道我的客戶仍然活着。 – user1874156

+0

心跳在那裏,以確保您的應用程序代碼是活着的。所以只要程序中的控制流程在輪詢方法內部,您就不必擔心心跳。 –