1
我在消費者組中的卡夫卡輪詢消息有問題。 我的消費對象分配給一個給定的分區與卡夫卡消費者調查消息與python
self.ps = TopicPartition(topic, partition)
之後,消費者分配到該分區:
self.consumer.assign([self.ps])
之後,我能夠與
算分區內部消息self.consumer.seek_to_beginning(self.ps)
pos = self.consumer.position(self.ps)
and self.consumer.seek_to_end(self.ps)
.....
在我的tpoic中有超過30000條消息。 問題是我只收到一條消息。
使用者配置有: max_poll_records= 200
AUTO_OFFSET_RESET
是最早
這裏是我的功能與此我想獲得消息:
def poll_messages(self):
data = []
messages = self.consumer.poll(timeout_ms=6000)
for partition, msgs in six.iteritems(messages):
for msg in msgs:
data.append(msg)
return data
即使我去了第一個可用的前偏移開始輪詢郵件 我只收到一封郵件。
self.consumer.seek(self.ps, self.get_first_offset())
我希望有人能解釋我做錯了什麼。 在此先感謝。
祝 喬恩
不幸的是尼克,我相信你的例子是阻塞調用 – cs94njw
同意。你爲什麼提到這個?它不影響輪詢機制嗎?我沒有審查源代碼。 – Nick
會發生什麼情況是,如果隊列中沒有消息(無法讀取),則for循環不會移動。 這不是一個問題,但它會降低靈活性。 上面的「soa」代碼使用輪詢,它將在隊列中等待幾秒鐘,然後執行其他操作。 我認爲「soa」正在尋找投票解決方案。 – cs94njw