0
我有一個很重的用戶數據流。我想通過它的id來確定這是否是新用戶。爲了減少對數據庫的調用,我寧願在先前用戶的內存中維護一個狀態。在一個流內保持狀態
val users = mutable.set[String]()
//init the state from db
user = db.getAllUsersIds()
val source: Source[User, NotUsed]
val dbSink: Sink[User, NotUsed] //goes to db
//if the user is added to the set it will return true
val usersFilter = Flow[User].filter(user => users.add(user.id))
現在我可以創建一個圖形
source ~> usersFilter ~> dbSink
我的問題是,可變狀態是共享的,不安全。是否有一個選項可以保持流程中的狀態?
您的建議存在主要缺陷。 這將需要很多的請求數據庫,這意味着很多的io。我寧願將它保存在內存中(我將更新我的文章) – igx
至少在我的情況下,我只對數據庫進行1次調用,並將單個查詢中的所有現有ID加載到列表中。然後只需在運行時查看流中的列表(或映射)即可。 –
你認爲這樣的事情會起作用嗎? 例如 'DEF alreadyExists = { VAL alreadyExistingIds = mutable.set [字符串](INIT從分貝) 流量[用戶] .filterNot(用戶=> alreadyExistingIds.add(user.id)) }' – igx