2015-07-06 114 views
2

我是Kafka的新手。通過從卡夫卡蟒蛇的幾個在線教程以幫助我寫了下面的代碼:在kafka-python客戶端使用消息

from kafka import SimpleProducer, KafkaClient, KafkaConsumer 
    kafka = KafkaClient("localhost:9092") 
    producer = SimpleProducer(kafka) 
    producer.send_messages(b'my-topic', b'this method', b'Hello World') 
    consumer = KafkaConsumer('my-topic', 
        group_id='my_group', 
        bootstrap_servers=['localhost:9092']) 
    for message in consumer: 
     print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, 
             message.offset, message.key, 
             message.value)) 

但問題是,在這最後的循環代碼執行卡住了,我想不通。

+0

我想我找到了答案。它似乎消耗來自SimpleProducer消費者的消息必須是SimpleConsumer的實例,因爲在執行該更改後代碼似乎工作。如果我錯了,請糾正。謝謝。 – Joy

回答

2

你原來的代碼是正確的。我運行你的代碼。 KafkaConsumer可以用來消費消息。如果您打開另一個控制檯並運行相同的原始代碼,您將看到輸出。

http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html,有很多Consumer類,如SimpleConsumer,KafkaConsumer,Consumer等等。爲什麼你的代碼被卡在for循環中?因爲默認情況下,代碼中的客戶設置消耗新的消息。在這種情況下,producer.send_messages()函數生成的消息不會被消費者使用。順便說一句,如果你使用SimpleConsumer,你可以使用seek()來設置你想消費的消息的偏移量。