2017-08-20 63 views
1

我需要能夠從地圖狀態中刪除比固定時間早的舊密鑰。 我目前保持關鍵狀態圖中每個事件的時間戳,我想有一個ansyncronous進程將刪除這些陳舊的密鑰。在Flink Mapstate中刪除TTL過期密鑰

我使用RocksDB作爲狀態後端,我不認爲RocksDB的Java API支持使用TTL打開的here

所以我的問題是:

  • 是它在所有可能擁有一個具有訪問Mapstate因爲它在操作功能上運行的異步線程?
  • 在這種情況下是否有更好的做法?

在此先感謝,在弗林克過期狀態

回答

2

一個簡單的方法是使用一個ProcessFunction運營商保持狀態。然後,您可以使用計時器(處理時間計時器或事件時間計時器,具體取決於您的應用程序的意義),並清除onTimer方法中的狀態。

+0

Flink的Table API顯示了一個狀態清理的例子:https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/ flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala – twalthr

+0

嗨,感謝您的快速回復。你所建議的解決方案並不差,但我唯一缺少的是ProcessFunction似乎沒有以異步方式拍攝OnTimer回調(對嗎? – Eliran

+0

對於事件時間計時器,OnTimer回調獲取在處理流元素的同一線程中調用,因爲它處理觸發定時器的水印。對於進程定時器,有一個單獨的線程來實現定時器服務。 –