2016-08-11 31 views
0

我使用的是spring-kafka和spring-kafka測試版本1.0.2.RELEASE。只有第一次獲得一個記錄卡夫卡消費者獲得記錄?

在我的測試之一,我的應用程序發送100條記錄在一排單TopicPartion上使用KafkaTemplate,大多默認的配置設置的EmbeddedKafka實例。

我使用KafkaTestUtils.getRecords(消費者)的方法來嘗試獲得從卡夫卡實例的記錄,並驗證它們已經全部發送。

我第一次打電話getRecords,我只收到一條記錄。如果我再次調用它,我得到其他99

如果我明確設定消費者的位置到TopicPartition的開頭,然後調用getRecords,我得到的所有100

爲什麼getRecords只能得到單一記錄第一次?是否有一些更好的方法可以一次獲得全部100個,然後通過在消費者上明確調用seekToBeginning?

回答

0

最有可能只是一個競爭條件 - 消費者坐在poll()和經紀人儘快到達發送的第一條消息。

請參閱kafka docs中的屬性fetch.min.bytesfetch.max.wait.ms

fetch.min.bytes默認爲1。

編輯

您也可以嘗試flush()調用getRecords()之前荷蘭國際集團的KafkaTemplate

但是,您的測試真的不應該依靠得到一個獲取所有信息 - 太脆。

+0

同樣的事情發生,如果我沖洗'KafkaTemplate'然後等待調用'getRecords()'之前5秒鐘。增加fetch.min.bytes的值會增加第一次調用getRecords返回的記錄總數。我預計5秒鐘的時間足以讓所有消息傳遞給經紀人。接下來的〜80條消息都可用於緊接在第一條之後的下一個「getRecords」調用。是否還有其他措施可以保證所有消息在消費者閱讀之前存在,或消費者是否會閱讀所有可用信息? –

+0

我有點驚訝'flush()沒有幫助,但正如我在上次編輯中所說的那樣 - 如果依靠單獨的時間進行測試,將會變得很脆弱,因此需要足夠大的'fetch.min.bytes'一個大的'fetch.max.wait.ms'可能是獲得可靠測試的唯一方法。它會讓你的測試運行時間更長(除非你能確切地計算出100個消息的字節數 - 以開銷爲單位),但它是可靠的(直到kafka改變開銷的大小:))。 –

+0

是的,我不想在那裏等待。我只是試圖測試問題是否這些消息是否仍然在飛往卡夫卡。看起來消費者第一次被叫時,所有的消息都應該在那裏。但是,第一次,消費者似乎只是讀足夠的消息來滿足'fetch.min.bytes'的值,然後在第二次調用時它將讀取儘可能多的其他消息,而不管大小如何。爲什麼第一次調用依賴於'fetch.min.bytes'值,但第二次調用不是? –

0

這聽起來像是一個計時問題。第一次調用poll()時,很可能只有一條消息可用 - 該方法無法保證將提取多少條消息。當你編寫代碼時,你不應該假設你會一口氣收到X條記錄。有一個卡夫卡0.10 max.poll.records的消費者財產,爲了測試目的,你可能想設置爲1,然後執行接收循環,直到你已經輪詢全部100.