2017-10-20 32 views
1

我有3個分區的話題,我想用從每一特定分區閱讀下面的代碼從使用Python

from kafka import KafkaConsumer, TopicPartition 

brokers = 'localhost:9092' 
topic = 'b3' 

m = KafkaConsumer(topic, bootstrap_servers=['localhost:9092']) 
par = TopicPartition(topic=topic, partition=1) 
m.assign(par) 

但我得到這個錯誤具體卡夫卡主題閱讀:

raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) 
kafka.errors.IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions. 

有人可以幫助我嗎?

回答

0

你可以刪除KafkaConsumer(專題參數),然後再試一次?

例如:

# manually assign the partition list for the consumer 
from kafka import TopicPartition, KafkaConsumer 
consumer = KafkaConsumer(bootstrap_servers='localhost:1234') 
consumer.assign([TopicPartition('foobar', 2)]) 
msg = next(consumer) 

裁判:http://kafka-python.readthedocs.io/en/master/

+0

你怎麼會插入超時這裏功能?如果沒有例如1的新消息出現,我想停止流式傳輸過程。 –

+0

。看看'poll'功能,它可以讓你設置超時。 http://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html?highlight=poll#kafka.KafkaConsumer.poll –

+0

BTW:難道我回答你原來的問題? :-) –