2017-08-14 41 views
0

kafka流處理在我們的系統中執行以進行事務處理。解決方案如下實現,流處理後從kafka主題中移除郵件

卡夫卡製作者向卡夫卡主題發佈事件,流處理器處理輸入事件並執行聚合操作。流處理之後,該事件將發佈到另一個主題。由於在第一個主題中沒有實現消費者,我如何從第一個主題中刪除處理後的消息。

回答

2

考慮到您的流處理鏈是第一個主題的使用者。如果您出於某種原因需要重新處理原始數據(例如,如果您意識到流處理邏輯中存在錯誤),那麼即使在處理完第一個主題後,您也可能希望獲得原始消息。

因此,您不需要刪除郵件,您必須在該主題上設置適合您需求的保留策略。折衷通常是數據可用時間與需要的存儲量之間的關係。

1

無法手動從kafka中刪除郵件(無法在磁盤上刪除數據,AFAIK)。你有三種選擇:

  • 使用基於時間的保留策略(例如讓卡夫卡刪除所有郵件自動年齡超過1小時)

  • 使用基於存儲的保留策略(讓卡夫卡保持話題尺寸是一些預定義的值)

  • 使用主題壓縮策略 - 讓kafka保留您的密鑰的最新版本。所有舊版本的密鑰將被刪除(壓縮)。

正如Luciano Afranllie所述 - 您不需要手動刪除消息。您可以處理消息並讓卡夫卡根據您的策略管理主題。