2013-02-07 280 views
7

我讀過Kafka網站上的文檔,但在嘗試實現一個完整的最小示例(生產者 - >卡夫卡 - >消費者)後,我不太清楚「消費者狀態」抵消需要處理。Apache Kafka:消費者狀態

一些信息

  1. 我使用高層API(Java)的
  2. 我的消費者是一個簡單的類可以在「快速開始」海邊的卡夫卡
  3. 頁上找到一個主,基本上是相同的
  4. 我使用的動物園管理員
  5. 我使用一個單一的經紀

現在,文檔說高層API消費者STOR上課用動物管理員,所以我期望的抵消,因此消費者的狀態將保持卡夫卡代理重新啓動

之間
  • 其狀態
  • 消費者重新

但遺憾的是它沒有:每個當我重新啓動經紀人或消費者時,所有消息都會重新發送。 現在,也許這些都是愚蠢的問題,但

  1. 在卡夫卡的情況下重新啓動:我明白,是由消費者來保持其狀態,所以可能當經紀人(重新)啓動重新傳遞所有(! )消息和消費者決定要消費什麼......是嗎?如果是這樣,如果我有10.0000.0000條消息會發生什麼?

  2. 如果JVM消費者重新啓動:如果狀態保持在Zookeeper上,爲什麼消息被重新傳遞?新的JVM有不同的消費者「身份」可能嗎?在這種情況下,我如何綁定以前的身份?

回答

2

看來我一直是一個不好的讀者......這一切都在配置頁面。具體來說,我的兩個問題都可以通過設置一個默認爲「最小」的標誌「autooffset.reset」來解決,因此會引起所描述的效果。

現在,在價值「最大」的情況下,無論是消費者還是經紀人重新啓動,事情都按預期工作,因爲抵消總是最大。

4

是的,消費者負責保持其狀態,並且Java高級消費者將其狀態保存在動物園管理員中。

很有可能您沒有指定groupId配置屬性。在這種情況下,卡夫卡隨機生成groupId

也有可能您關閉autocommit.enable配置屬性。

可以在此頁面找到完整的卡夫卡配置參考:http://kafka.apache.org/configuration.html根據「高級使用者的重要配置屬性」標題爲

4

回答原來的問題:使用的groupId有助於避免「重消費,從一開始的時候所有消息」的局面

如果更改的groupId,你會得到的那一刻起隊列創建的所有消息(或自上次基於卡夫卡日誌保留策略的數據清除)

不要將此與kafka-console-consumer「--from-beginning」標誌(其設置auto.offset.reset選項)混淆在下面的選項1和2之間進行選擇:

1)從消費最後一條消息開始消耗新消息(N OT從最初創建kafka隊列時開始):

props.put(「auto.offset.reset」,「smallest」);

2)消費從目前用戶JVM啓動新郵件(在這種情況下,你的風險放入隊列中,而用戶是向下而不是失蹤的消息聽隊列):

props.put( 「auto.offset.reset」, 「最大」);


側面說明:下面只切向有關原來的問題

一個更高級的使用情況 - 如果你想以編程設定消費者偏移可以重播一定時間的消息開始 - 它需要使用SimpleConsumer API,如https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example所示,以便找到從正確代理/分區重播的最小偏移量。這基本上是用我們自己的FindLeader邏輯來替代zookeeper。非常棘手。

對於這個用例(從特定用戶指定時間開始的消息的臨時重播),我們決定存儲消息的本地緩存並在本地管理偏移而不是使用kafka偏移量管理api(這將需要重新實現一個好的大量的動物園管理員功能與SimpleConsumer)。

I.e.將kafka視爲「郵遞員」,一旦郵件被髮送到本地郵箱,並且如果我們需要返回到過去的特定偏移量,並且例如重放(已經被消費的)消息,例如在消費者應用程序錯誤的情況下,我們不會回到「郵局」(卡夫卡經紀人)來找出正確的遞送順序,但在本地管理它。附帶說明的

年底

+0

你能對你如何管理從卡夫卡偏移在本地而不是詳細點嗎?就像你如何確定和計算髮送給每個消息的偏移量然後消耗。 – David

+0

一旦消耗 - 將當前時間戳添加爲msg id,並在hsql中將消息存儲爲二進制blob(它以avro格式發送,我們不會反序列化)hsql(持久化到磁盤),或者您可以使用apache phoenix和歸檔它在二進制格式有兩列ID(時間戳),消息(VARBINARY) – alex

+0

但是,這與消息偏移有什麼關係? Kafka偏移量值不是消息的時間戳或二進制編碼,也不是它的哈希值?我還是卡夫卡的新手,請原諒我的無知。 – David