我使用的是spring-kafka和spring-kafka測試版本1.0.2.RELEASE。只有第一次獲得一個記錄卡夫卡消費者獲得記錄?
在我的測試之一,我的應用程序發送100條記錄在一排單TopicPartion上使用KafkaTemplate,大多默認的配置設置的EmbeddedKafka實例。
我使用KafkaTestUtils.getRecords(消費者)的方法來嘗試獲得從卡夫卡實例的記錄,並驗證它們已經全部發送。
我第一次打電話getRecords,我只收到一條記錄。如果我再次調用它,我得到其他99
如果我明確設定消費者的位置到TopicPartition的開頭,然後調用getRecords,我得到的所有100
爲什麼getRecords只能得到單一記錄第一次?是否有一些更好的方法可以一次獲得全部100個,然後通過在消費者上明確調用seekToBeginning?
同樣的事情發生,如果我沖洗'KafkaTemplate'然後等待調用'getRecords()'之前5秒鐘。增加fetch.min.bytes的值會增加第一次調用getRecords返回的記錄總數。我預計5秒鐘的時間足以讓所有消息傳遞給經紀人。接下來的〜80條消息都可用於緊接在第一條之後的下一個「getRecords」調用。是否還有其他措施可以保證所有消息在消費者閱讀之前存在,或消費者是否會閱讀所有可用信息? –
我有點驚訝'flush()沒有幫助,但正如我在上次編輯中所說的那樣 - 如果依靠單獨的時間進行測試,將會變得很脆弱,因此需要足夠大的'fetch.min.bytes'一個大的'fetch.max.wait.ms'可能是獲得可靠測試的唯一方法。它會讓你的測試運行時間更長(除非你能確切地計算出100個消息的字節數 - 以開銷爲單位),但它是可靠的(直到kafka改變開銷的大小:))。 –
是的,我不想在那裏等待。我只是試圖測試問題是否這些消息是否仍然在飛往卡夫卡。看起來消費者第一次被叫時,所有的消息都應該在那裏。但是,第一次,消費者似乎只是讀足夠的消息來滿足'fetch.min.bytes'的值,然後在第二次調用時它將讀取儘可能多的其他消息,而不管大小如何。爲什麼第一次調用依賴於'fetch.min.bytes'值,但第二次調用不是? –