2016-06-14 134 views
0

如果消費者使用auto.offset.reset=latest,那麼在發佈者和N使用者使用auto.offset.reset=latest之後,他們會錯過在訂閱該消息之前發佈到某個主題的所有消息......消費者使用auto.offset.reset=latest直到所有的用戶開始使用消息,然後開始出版使卡夫卡消費者在訂閱之前消費現有消息

  1. 製作發行商等待:不重播的話題存在認繳之前的消息...

    所以我需要兩種。不知道如何做到這一點,而不是利用Zookeeper。卡夫卡是否提供了這樣做的手段?

  2. 另一種方法是有auto.offset.reset=latest消費者,使他們明確地消耗掉所有現有的消息之前的情況下,他們即將訂閱與現有消息的主題...

,這是什麼情況下,最好的做法是什麼?

我想消費者必須檢查現有消息的主題,如果有消息,則消費它們,然後啓動消費。這聽起來像是對我來說最好的辦法......

+0

使用'auto.offset.reset =最早'有沒有什麼壞處? – avr

+0

如果你使用最早的,你需要記憶或堅持最後的偏移量,以便知道上次停止的位置,對嗎?我只是意識到,這兩個選項都是錯誤的,唯一的方法是獲得最後的偏移量... – lisak

+0

是的,你是對的。你只需要每次跟蹤偏移量。你介意告訴你的用例,比如你在哪裏使用Kafka,以及你正在使用哪種處理引擎,如火花或風暴?無論是批處理還是流媒體? – avr

回答

-1

如果一個高層次的消費得到啓動,它執行以下操作:

  1. 外觀爲它的消費羣

    一個堅定的偏移。如果發現有效偏移量,則從那裏恢復

    b。如果沒有有效的偏移發現,根據auto.offset.reset

因此,auto.offset.reset設定偏移纔會觸發,如果沒有有效的抵消承諾。此行爲是有意和必要的,以便在出現故障時提供至少一次處理保證。

因此,你想讀從一開始的話題,你可以使用一個新的消費group.id並設置auto.offset.reset = earliest,或者你明確修改使用seekToBeginning()啓動時的偏移您啓動poll()循環之前。

0

我們使用Eureka提供的服務發現功能(任何其他服務發現應用可以完成這項工作)+混疊來做選項(1)。基本上,發佈者不會註冊自己(並且開始處理請求或發佈通知),直到至少有一個訂閱者可用。