我正在使用下面的代碼來讀取來自主題的消息。我面臨兩個問題。 每當我開始使用消費者時,它正在讀取隊列中的所有消息? 如何閱讀未讀郵件?卡夫卡python消費者開始時讀取所有消息
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
consumer.commit()
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
我想你必須在閱讀完''consumer.commit()'後。 –
感謝@KenjiNoguchi,我試着與consumer.commit(),但仍然無法正常工作。任何提示 – user3570620