2016-04-06 56 views
1

我有一個應用程序預加載100萬字符串的主題有4個分區,所以250K電子郵件。爲什麼我所有的卡夫卡提交都沒有實際提交? (0.9.0.1)

然後,應用程序會調用2名讀者手動分配的分區:

Reader 1 -> Partition 0 & 1 
Reader 2 -> Partition 2 & 3 

我通過嗡嗡聲和從分區讀取並執行每個ConsumerRecord一個consumer.commitAsync我讀取(沒有提交的配料在這一點,故意,直到我明白行爲)。

我在提交異步回調內部放置了一個per-topic計數器來測量它被調用的次數,總計達到100萬。該應用程序穩定下來,並停止

後,我用的是卡夫卡CLI工具來看看我的補償,我得到這樣的:

Group   Topic       Pid Offset   logSize   Lag    Owner 
group1   lowercaseStrings    0 233788   250000   16212   none 
group1   lowercaseStrings    1 249999   250000   1    none 
group1   lowercaseStrings    2 249999   250000   1    none 
group1   lowercaseStrings    3 233788   250000   16212   none 

注意的1兩的這些滯後是巧合 - 我有時得到不同的數字。

我注意到任何傳入的異常的異步提交回調函數,並且沒有。根據我的代碼,我已經按預期調用了100萬次commitAsync。

爲什麼我仍然有這種滯後?什麼可能導致這個?

回答

0

卡夫卡批量提取郵件。當您在消息流中調用next()時,可能發生兩件事: - 本地沒有數據,您的客戶將在內部調用poll(),它將更新其偏移狀態 - 有一些,提前在本地批處理。

當您調用commitAsync()時,您提交了在最後一次poll()調用時獲得的偏移量,而不是通過內部迭代實現的偏移量。

您可以通過減小接受的批處理的大小(在配置中設置batch.size)來控制此行爲,如果您真的想爲每條消息輪詢(),則可以將其控制爲0。我預計,通過將此配置降至0,您將在任何地方達到0滯後(請注意,對於可擴展使用,這會破壞吞吐量)。

相關問題