2017-02-01 83 views
3

我們開始一個Kafka消費者,傾聽一個可能尚未創建的主題(儘管主題自動創建已啓用)。爲什麼卡夫卡消費者需要很長時間才能開始消費?

此後不久,生產者正在發佈有關該主題的消息。

但是,消費者需要花費一些時間通知這需要5分鐘。此時消費者撤銷其分區並重新加入消費者羣體。卡夫卡重新穩定了這個組織。查看消費者與卡夫卡日誌的時間戳,這個過程在消費者端實例化。

我想這是預期的行爲,但我想了解這一點。這實際上是重新平衡(從0到1分區)嗎?如果我們預先創建主題,這是否會發生?

2017-02-01 08:36:45.692 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group tps-kafka-partitioning 
2017-02-01 08:36:45.692 INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2017-02-01 08:36:45.693 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group tps-kafka-partitioning 
2017-02-01 08:36:45.738 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group tps-kafka-partitioning with generation 1 
2017-02-01 08:36:45.747 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [] for group tps-kafka-partitioning 
2017-02-01 08:36:45.749 INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[] 
2017-02-01 08:41:45.540 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [] for group tps-kafka-partitioning 
2017-02-01 08:41:45.544 INFO 7 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[] 
2017-02-01 08:41:45.544 INFO 7 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group tps-kafka-partitioning 

卡夫卡記錄

[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-partitioning with old generation 1 (kafka.coordinator.GroupCoordinator) 
[2017-02-01 08:41:45,546] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-partitioning generation 2 (kafka.coordinator.GroupCoordinator) 
[2017-02-01 08:41:45,551] INFO [GroupCoordinator 1001]: Assignment received from leader for group tps-kafka-partitioning for generation 2 (kafka.coordinator.GroupCoordinator) 
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Preparing to restabilize group tps-kafka-group-id with old generation 1 (kafka.coordinator.GroupCoordinator) 
[2017-02-01 08:42:14,636] INFO [GroupCoordinator 1001]: Stabilized group tps-kafka-group-id generation 2 (kafka.coordinator.GroupCoordinator) 

回答

4

這可能是由於參數的默認值metadata.max.age.ms控制消費者如何常常迫使元數據的刷新的話題。

當你用一個不存在的主題啓動消費者時,會發生什麼情況是代理自動創建這個主題,但是這需要一點點時間用於領導選舉等,所以當你的消費者請求該主題的元數據時, LEADER_NOT_AVAILABLE警告並且無法獲取任何消息。 在達到上述超時之後,消費者刷新元數據,這次成功地開始閱讀消息。這不依賴於生產者寫信息的主題,它純粹是消費者的事情。

如果您啓動消費者,例如1000毫秒超時,您應該看到一個更短的延遲,直到消息被消耗。另外,如果您事先創建了主題,或者在消費者之前啓動了製作者,則此行爲根本不應該發生。