2016-04-03 96 views
0

我試着通過設定偏移,但得到斷言錯誤消耗從主題數據 -Asse田:未分配的分區

from kafka import KafkaConsumer 

consumer = KafkaConsumer('foobar1', 
         bootstrap_servers=['localhost:9092']) 
print 'process started' 
print consumer.partitions_for_topic('foobar1') 
print 'done' 
consumer.seek(0,10) 

for message in consumer: 
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, 
              message.offset, message.key, 
              message.value)) 
print 'process ended' 

錯誤: -

Traceback (most recent call last): 
    File "/Users/pn/Documents/jobs/ccdn/kafka_consumer_1.py", line 21, in <module> 
    consumer.seek(0,10) 
    File "/Users/pn/.virtualenvs/vpsq/lib/python2.7/site-packages/kafka/consumer/group.py", line 549, in seek 
    assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' 
AssertionError: Unassigned partition 

回答

1

你必須調用consumer.assign()在調用seek之前列出TopicPartition。 另請注意,seek的第一個參數也是TopicPartition。 見KafkaConsumer API

0

在我的情況與Kafka 0.9kafka-python,分區分配中for message in consumer發生。所以,迭代之後應該尋求操作。我重置我的組的偏移由以下代碼:

import kafka 

ps = [] 
for i in xrange(topic_partition_number): 
    ps.append(kafka.TopicPartition(topic, i)) 

consumer = kafka.KafkaConsumer(topic, bootstrap_servers=address, group_id=group) 
for msg in consumer: 
    print msg 
    consumer.seek_to_beginning(*ps) 
    consumer.commit() 
    break