2016-02-16 52 views
1

我試圖在卡夫卡創建新主題時啓動動態消費者,但動態啓動的消費者始終缺少啓動/第一條消息,但從此消費該消息。我正在使用kafka-python模塊,並使用更新的KafkaConsumer和KafkaProducer。卡夫卡消費者沒有獲取所有郵件

代碼生產者是

producer = KafkaProducer(bootstrap_servers='localhost:9092') 
record_metadata = producer.send(topic, data) 

和消費者的代碼是

consumer = KafkaConsumer(topic,group_id="abc",bootstrap_servers='localhost:9092',auto_offset_reset='earliest') 

請提出來的東西來過這個問題或任何配置我在我的生產者和消費者實例包括。

+0

你是什麼意思的動態消費者? – puneet

+0

每當創建新主題時,我都會啓動消費者。我們稱這些消費者在運行時就像動態發佈一樣動態。 –

+0

在您的消費者中,您正在提供主題。你如何知道主題名稱?數據發佈後是否創建了消費者流?檢查我的答案...我認爲這是可能的解決方案 – puneet

回答

2

你可以設置auto_offset_reset最早。

創建新消費者流時,它將從最新的偏移量(auto_offset_reset的默認值)開始,並且您將錯過消費者未啓動時發送的消息。

你可以在kafka python doc的地方閱讀。相關部分低於

auto_offset_reset(STR) - 一種在 OffsetOutOfRange錯誤復位偏移政策:「最早的」將移動到最早的可用 消息,「最新的」將移動到最近。任何其他值將會引起異常 。默認:'最新'。