2016-01-09 137 views
4

我正在使用下面的代碼來讀取來自主題的消息。我面臨兩個問題。 每當我開始使用消費者時,它正在讀取隊列中的所有消息? 如何閱讀未讀郵件?卡夫卡python消費者開始時讀取所有消息

from kafka import KafkaConsumer 


consumer = KafkaConsumer('my-topic', 
         group_id='my-group', 
         bootstrap_servers=['localhost:9092']) 
for message in consumer: 
    consumer.commit() 
    # message value and key are raw bytes -- decode if necessary! 
    # e.g., for unicode: `message.value.decode('utf-8')` 
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, 
              message.offset, message.key, 
              message.value)) 
+0

我想你必須在閱讀完''consumer.commit()'後。 –

+0

感謝@KenjiNoguchi,我試着與consumer.commit(),但仍然無法正常工作。任何提示 – user3570620

回答

4

由於@Kenji說你必須承諾與consumer.commit()抵消。如果您不想手動提交,則可以通過將enable_auto_commit=True傳遞給您的KafkaConsumer來啓用自動提交。您可能還想調整auto_commit_interval_ms,這是每個自動提交之間的時間間隔(以毫秒爲單位)。看到這裏:http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html

+0

謝謝@ se7entyse7en我試着與consumer.commit(),但仍然無法正常工作。任何提示 – user3570620

+0

@ user3570620,也許這是有幫助的:http://stackoverflow.com/questions/36579815/kafka-python-how-do-i-commit-a-partition –