2015-04-28 35 views
4

我想使用Kafka高級消費者API,並在同一時間,我想禁用自動提交的偏移量。我試圖通過以下步驟來實現這一點。卡夫卡offsetcommit請求與高級消費者API

1) auto.commit.enable = false 
2) offsets.storage = kafka 
3) dual.commit.enabled = false 

我創建了一個偏移量管理器,該管理器週期性地創建一個offsetcommit請求給kafka並提交偏移量。

儘管如此,我有以下幾個問題

1)是否高層次的消費API自動獲取從卡夫卡存儲偏移,並與偏移初始化本身?或者我應該使用簡單的消費者API來實現這一目標?

2)基於kafka的補償存儲是否在所有經紀商中都有效?或者它只在一個經紀人上維護?

回答

0

我創建了一個偏移量管理器,它週期性地爲kafka創建offsetcommit請求並提交偏移量。

你不需要做,如果你使用的是高級用戶,其提供了方法手動提交偏移,在javadoc(下手動偏移控制)提供了有關如何做到這一點的例子。

1)是否高層次的消費API自動獲取從卡夫卡存儲偏移,並與偏移初始化本身?或者我應該使用簡單的消費者API來實現這一目標?

當您重新啓動它時,高級用戶將負責提取上次提交的偏移量,因此您可以從停止的位置繼續使用。

2)基於kafka的偏移量存儲是否在所有經紀人中被重複使用?或者它只在一個經紀人上維護?

卡夫卡存儲消費者偏移名爲__consumer_offsets內部主題中,默認情況下它的replication因子設置爲3 50個分區。所以它在3個經紀人中被複制。您可以在broker config找到更多關於其配置的信息,它們以offsetoffsets開頭。