kafka流處理在我們的系統中執行以進行事務處理。解決方案如下實現,流處理後從kafka主題中移除郵件
卡夫卡製作者向卡夫卡主題發佈事件,流處理器處理輸入事件並執行聚合操作。流處理之後,該事件將發佈到另一個主題。由於在第一個主題中沒有實現消費者,我如何從第一個主題中刪除處理後的消息。
kafka流處理在我們的系統中執行以進行事務處理。解決方案如下實現,流處理後從kafka主題中移除郵件
卡夫卡製作者向卡夫卡主題發佈事件,流處理器處理輸入事件並執行聚合操作。流處理之後,該事件將發佈到另一個主題。由於在第一個主題中沒有實現消費者,我如何從第一個主題中刪除處理後的消息。
考慮到您的流處理鏈是第一個主題的使用者。如果您出於某種原因需要重新處理原始數據(例如,如果您意識到流處理邏輯中存在錯誤),那麼即使在處理完第一個主題後,您也可能希望獲得原始消息。
因此,您不需要刪除郵件,您必須在該主題上設置適合您需求的保留策略。折衷通常是數據可用時間與需要的存儲量之間的關係。
無法手動從kafka中刪除郵件(無法在磁盤上刪除數據,AFAIK)。你有三種選擇:
使用基於時間的保留策略(例如讓卡夫卡刪除所有郵件自動年齡超過1小時)
使用基於存儲的保留策略(讓卡夫卡保持話題尺寸是一些預定義的值)
使用主題壓縮策略 - 讓kafka保留您的密鑰的最新版本。所有舊版本的密鑰將被刪除(壓縮)。
正如Luciano Afranllie所述 - 您不需要手動刪除消息。您可以處理消息並讓卡夫卡根據您的策略管理主題。
有一個Kafka改進方案(KIP)爲這個用例準確添加這個功能。
目前所有Scala代碼做郵件刪除是0.11卡夫卡和已經過測試工作
https://github.com/apache/kafka/pull/2476
然而,除了這個功能在Java的AdminClient API中並且文檔尚未完成。