我對kafka和kafka-python來說相當陌生。在安裝kafka-python之後,我試着從這裏簡單地實現了用戶代碼 - http://kafka-python.readthedocs.io/en/master/usage.htmlkafka-python消費者給出的錯誤
我一直在用kafka的bin目錄編寫用戶代碼,並嘗試從那裏運行python代碼。不過,我得到以下錯誤:
Traceback (most recent call last): File "KafkaConsumer.py", line 4, in for message in consumer: File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next return type(self).next(self) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 915, in next return next(self._iterator) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 876, in _message_generator for msg in self._fetcher: File "/usr/local/lib/python2.7/dist-packages/kafka/vendor/six.py", line 559, in next return type(self).next(self) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 520, in next return next(self._iterator) File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 477, in _message_generator for msg in self._unpack_message_set(tp, messages): File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 372, in _unpack_message_set inner_mset = msg.decompress() File "/usr/local/lib/python2.7/dist-packages/kafka/protocol/message.py", line 121, in decompress assert has_snappy(), 'Snappy decompression unsupported' AssertionError: Snappy decompression unsupported
這是我一直在試圖運行代碼:
from kafka import KafkaConsumer
consumer = KafkaConsumer ('mytopic',bootstrap_servers = ['localhost:9092'], group_id='test-consumer-group')
print "Consuming messages from the given topic"
for message in consumer:
print("%s:%d%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
因爲,我真的很新的卡夫卡,我無法理解我做錯了什麼。
你好,感謝您對本建議。有效。但是,現在突然間代碼被卡在「從給定主題消費消息」上。我不知道爲什麼它不再打印數據。沒有做任何改變,它突然停止工作。 –
這似乎是一個單獨的問題。有沒有未消耗的消息留在主題中消費?您的KafkaConsumer會跟蹤其消耗的消息,並且不會消耗已消耗的消息。在循環之前返回開始調用'consumer.seek_to_beginning()'。 – xli
感謝您的建議。我在循環之前嘗試了consumer.seek_to_beginning(),但是拋出了一個斷言錯誤。所以,我剛剛刪除了kafka主題,訂閱了我正在查找的數據並再次啓動了kafka。目前,這工作。我將嘗試看看如何更改代碼,以便從一開始就使用消息。再次感謝您的建議。 –