2016-07-22 37 views
0

我正在使用Kafka 0.9.0.1。Kafka:從主題消耗第一條消息時間歇性緩慢

我第一次開始了我的應用程序需要20-30秒內從

我用不同的卡夫卡經紀人(具有不同CONFIGS)專題檢索「最新」消息,但我仍然看到此行爲。隨後的消息通常沒有緩慢。

這是預期的行爲?你可以清楚地看到以下運行此示例應用程序,改變經紀/主題名稱你自己的設置

public class KafkaProducerConsumerTest { 

    public static final String KAFKA_BROKERS = "..."; 
    public static final String TOPIC = "..."; 

    public static void main(String[] args) throws ExecutionException, InterruptedException { 
     new KafkaProducerConsumerTest().run(); 
    } 

    public void run() throws ExecutionException, InterruptedException { 
     Properties consumerProperties = new Properties(); 
     consumerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS); 
     consumerProperties.setProperty("group.id", "Test"); 
     consumerProperties.setProperty("auto.offset.reset", "latest"); 
     consumerProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     MyKafkaConsumer kafkaConsumer = new MyKafkaConsumer(consumerProperties, TOPIC); 
     Executors.newFixedThreadPool(1).submit(() -> kafkaConsumer.consume()); 

     Properties producerProperties = new Properties(); 
     producerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS); 
     producerProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     producerProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

     MyKafkaProducer kafkaProducer = new MyKafkaProducer(producerProperties, TOPIC); 
     kafkaProducer.publish("Test Message"); 
    } 
} 


class MyKafkaConsumer { 
    private final Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class); 
    private KafkaConsumer<String, Object> kafkaConsumer; 

    public MyKafkaConsumer(Properties properties, String topic) { 
     kafkaConsumer = new KafkaConsumer<String, Object>(properties); 
     kafkaConsumer.subscribe(Lists.newArrayList(topic)); 
    } 

    public void consume() { 
     while (true) { 
      logger.info("Started listening..."); 
      ConsumerRecords<String, Object> consumerRecords = kafkaConsumer.poll(Long.MAX_VALUE); 
      logger.info("Received records {}", consumerRecords.iterator().next().value()); 
     } 
    } 
} 

class MyKafkaProducer { 
    private KafkaProducer<String, Object> kafkaProducer; 
    private String topic; 

    public MyKafkaProducer(Properties properties, String topic) { 
     this.kafkaProducer = new KafkaProducer<String, Object>(properties); 
     this.topic = topic; 
    } 

    public void publish(Object object) throws ExecutionException, InterruptedException { 
     ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, "key", object); 
     Future<RecordMetadata> response = kafkaProducer.send(producerRecord); 
     response.get(); 
    } 

} 

回答

1

,因爲當你指定的消費羣體開始一個新的消費者的第一條消息應該需要更長的時間比其他人通過consumerProperties.setProperty("group.id", "Test");聲明,Kakfka將平衡這些分區,使得每個分區最多被一個消費者使用,並將跨多個消費者進程分配該主題的分區。

此外,卡夫卡0.9還有一個單獨的__consumer_offsets主題,卡夫卡用它來管理消費羣中每個消費者的偏移量。很可能當你第一次啓動消費者時,它會查看這個主題來獲取最新的偏移量(可能有一個消費者正在消耗這個話題,而這個消費者會被殺死,因此有必要從正確的偏移量)。

這2個因素會導致消耗第一組消息的延遲更高。我無法評論20-30秒的確切延遲,但我想這應該是默認行爲。

PS:準確的數字還可能取決於其他次要因素,例如您是在同一臺機器上(不存在網絡延遲)的消費者運行代理&,還是在使用TCP進行通信的不同的次要因素。

0

現在很多時候只用最少的日誌記錄添加就試過了你的代碼。以下是一個典型的日誌輸出:

2016-07-24 15:12:51,417 Start polling...|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,604 producer has send message|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,619 producer got response, exiting|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,679 Received records [Test Message]|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:51,679 Start polling...|INFO|KafkaProducerConsumerTest 
2016-07-24 15:12:54,680 returning on empty poll result|INFO|KafkaProducerConsumerTest 

事件的順序如預期的那樣及時。消費者開始投票,生產者發送消息並接收結果,消費者接收消息並且以300ms完成所有這些。然後消費者再次開始輪詢,並在3秒後拋出,因爲我分別更改輪詢超時。

我爲經紀人和客戶端庫使用Kafka 0.9.0.1。連接在localhost上,它是一個沒有負載的測試環境。

爲了完整起見,這裏是上面交換機觸發的服務器的日誌形式。

[2016-07-24 15:12:51,599] INFO [GroupCoordinator 0]: Preparing to restabilize group Test with old generation 0 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:12:51,599] INFO [GroupCoordinator 0]: Stabilized group Test generation 1 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:12:51,617] INFO [GroupCoordinator 0]: Assignment received from leader for group Test for generation 1 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:13:24,635] INFO [GroupCoordinator 0]: Preparing to restabilize group Test with old generation 1 (kafka.coordinator.GroupCoordinator) 
[2016-07-24 15:13:24,637] INFO [GroupCoordinator 0]: Group Test generation 1 is dead and removed (kafka.coordinator.GroupCoordinator) 

您可能想要與您的服務器日誌進行同一交換的比較。

+0

感謝您的嘗試,我斷斷續續地看到了這種瞬間行爲,但如果您沒有請嘗試幾次,您應該看到延遲。另外,我很欣賞你的理論,但是我在「開始聆聽」之後還發布了一秒鐘的消息,而且還需要20秒左右 – DJ180

0

根據this link

嘗試在你的消費者設置group_id=None,或致電consumer.close() 結束腳本,或使用分配()之前不同意()。否則,您將 重新加入已知但成員沒有響應的現有組。 組協調員將等待,直到那些成員簽入/離開/超時。 由於消費者不再存在(它是您先前的腳本運行),因此它們有 超時。 和consumer.poll()會在組重新平衡期間阻塞。

因此,如果您與無響應的成員加入羣組(可能會不正常地終止應用程序),這是正確的行爲。

請確認您在退出應用程序之前調用「consumer.close()」。