我有以下程序來消耗所有來到Kafka的消息。如何關閉卡夫卡消費者一旦消費完所有消息?
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_test_topic',
group_id='my-group',
bootstrap_servers=['my_kafka:9092'])
for message in consumer:
consumer.commit()
print ("%s key=%s value=%s" % (message.topic,message.key,
message.value))
KafkaConsumer.close()
使用上面的程序,我能夠消耗所有來到卡夫卡的消息。但是,一旦所有消息都消耗完了,我想關閉不在發生的卡夫卡消費。我需要幫助。
我試過使用consumer.close(),但它似乎沒有出來的for循環本身執行close()。可能是我想念什麼不知道什麼。 – pankmish
我認爲你的循環將會無限期地等待更多的消息。你必須給它一個退出條件才能結束。 – dawsaw