2017-09-10 175 views
1

我在消費者組中的卡夫卡輪詢消息有問題。 我的消費對象分配給一個給定的分區與卡夫卡消費者調查消息與python

self.ps = TopicPartition(topic, partition) 

之後,消費者分配到該分區:

self.consumer.assign([self.ps]) 

之後,我能夠與

算分區內部消息
self.consumer.seek_to_beginning(self.ps) 
pos = self.consumer.position(self.ps) 

and self.consumer.seek_to_end(self.ps) .....

在我的tpoic中有超過30000條消息。 問題是我只收到一條消息。

使用者配置有: max_poll_records= 200 AUTO_OFFSET_RESET是最早

這裏是我的功能與此我想獲得消息:

def poll_messages(self): 


    data = [] 

    messages = self.consumer.poll(timeout_ms=6000) 


    for partition, msgs in six.iteritems(messages): 

     for msg in msgs: 

      data.append(msg) 

    return data 

即使我去了第一個可用的前偏移開始輪詢郵件 我只收到一封郵件。

self.consumer.seek(self.ps, self.get_first_offset()) 

我希望有人能解釋我做錯了什麼。 在此先感謝。

祝 喬恩

回答

0

我相信你是誤解max_poll_records - 這並不意味着你會得到200個調查,只是限制在最你可能會得到。您需要多次調用輪詢。我是指你的文檔進行簡單的例子:http://kafka-python.readthedocs.io/en/master/usage.html

我相信一個更標準的實現是:

for message in self.consumer: 
    # do stuff like: 
    print(msg) 
+0

不幸的是尼克,我相信你的例子是阻塞調用 – cs94njw

+0

同意。你爲什麼提到這個?它不影響輪詢機制嗎?我沒有審查源代碼。 – Nick

+0

會發生什麼情況是,如果隊列中沒有消息(無法讀取),則for循環不會移動。 這不是一個問題,但它會降低靈活性。 上面的「soa」代碼使用輪詢,它將在隊列中等待幾秒鐘,然後執行其他操作。 我認爲「soa」正在尋找投票解決方案。 – cs94njw