2017-08-20 24 views
1

我正在學習Kafka Stream並使用Processor API來實現我的用例。下面的代碼顯示了Process方法,它在調用commit之前將消息轉發到下游並中止。這會導致流被重新處理並複製宿端上的消息。如何以完全一次的模式實現處理器API

public void process(String key, String value) { 

    context.forward(key, value); 

    .. 
    .. 
    //killed 

    context.commit(); 
} 

processing.guarantee參數:

streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); 

是否有隻調用commit語句時應用轉發的方式。如果不是的話,那麼實施完全一次模式的正確方法是什麼。

謝謝

回答

2

確保您的接收器處於消費模式read_committed,因此它只會看到提交的消息。如果在事務中止之前將消息寫入輸出主題,則在中止時,消息仍然存在,但未在提交時標記。通過事務第二次完成後,消息和提交標記將添加到輸出主題。如果您閱讀時未使用read_committed模式,則會看到所有消息(包括未提交的消息),並且可能會顯示爲重複消息,因爲您會看到中止的結果和已提交的結果。

從0.11的javadoc這裏https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

交易卡夫卡0.11.0介紹了其中的應用程序可以 寫入多個主題和分區原子。爲了使這個 工作,消費者從這些分區讀取應配置 只讀取提交的數據。這可以通過在消費者配置中設置 isolation.level = read_committed來實現。

在read_committed模式下,使用者將只讀取那些已成功提交的事務性消息。它會像以前一樣繼續讀取非事務性消息。在read_committed模式下沒有 客戶端緩衝。相反,讀取提交的使用者的分區的結尾偏移量 將是屬於打開事務的分區中的第一個消息的偏移量 。 此偏移量被稱爲「最後穩定偏移量」(LSO)。

一個read_committed的使用者將只讀取直到LSO並過濾掉已被中止的任何事務性消息的 。 LSO還 影響readTo_nd(Collection)和 endOffsets(Collection)對於read_committed使用者的行爲,其中 的詳細信息位於每個方法的文檔中。最後,讀取滯後度量標準爲 也被調整爲相對於read_committed使用者的LSO。 帶有事務性消息的分區將包含提交或中止 指示事務結果的標記。標記 未返回到應用程序,但在日誌中有偏移量。作爲 的結果,從具有事務性消息 的主題中讀取的應用程序將看到消耗的偏移量中的差距。這些丟失的消息將是事務標記的 ,並且它們被隔離級別的 中的消費者過濾掉。此外,使用read_committed 的應用程序消費者也可能會看到由於中止事務而導致的間隙,因爲這些消息不會被消費者返回,而會有 有效的偏移量。

0

你可能喜歡包裹context.commit()在finally塊來確保它被調用。但是,您還需要確保在成功處理後確實調用了它。

+0

嗨,蘇尼爾,只有當過程停止優雅時纔有效。 – user3812692

相關問題