2017-07-17 137 views
0

我有以下程序來消耗所有來到Kafka的消息。如何關閉卡夫卡消費者一旦消費完所有消息?

from kafka import KafkaConsumer 

consumer = KafkaConsumer('my_test_topic', 
         group_id='my-group', 
         bootstrap_servers=['my_kafka:9092']) 
for message in consumer: 
    consumer.commit() 
    print ("%s key=%s value=%s" % (message.topic,message.key, 
              message.value)) 
KafkaConsumer.close() 

使用上面的程序,我能夠消耗所有來到卡夫卡的消息。但是,一旦所有消息都消耗完了,我想關閉不在發生的卡夫卡消費。我需要幫助。

回答

0

如果我向KafkaConsumer對象提供consumer_timeout_ms參數,我現在可以關閉卡夫卡消費者。它接受以毫秒爲單位的超時值。 以下是代碼片段。

from kafka import KafkaConsumer 

consumer = KafkaConsumer('my_test_topic', 
         group_id='my-group', 
         bootstrap_servers=['my_kafka:9092'], 
         consumer_timeout_ms=1000) 
for message in consumer: 
    consumer.commit() 
    print ("%s key=%s value=%s" % (message.topic,message.key, 
              message.value)) 
KafkaConsumer.close() 

在上面的代碼中,如果消費者在1秒內沒有看到任何消息,它將關閉會話。

0

它看起來像你想要consumer.close()而不是KafkaConsumer.close()。它沒有被記錄爲靜態方法。

+0

我試過使用consumer.close(),但它似乎沒有出來的for循環本身執行close()。可能是我想念什麼不知道什麼。 – pankmish

+0

我認爲你的循環將會無限期地等待更多的消息。你必須給它一個退出條件才能結束。 – dawsaw