在這種使用情況下,組合IO庫scalaz-stream成功地在您的代碼中保持可變性。首先,依賴關係:
libraryDependencies ++= Seq(
"org.scalaz" %% "scalaz-core" % "7.2.8",
"org.scalaz" %% "scalaz-concurrent" % "7.2.8",
"org.scalaz.stream" %% "scalaz-stream" % "0.8.6a"
)
我們先從scalaz-concurrent
和scalaz-stream
部分進口:
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.duration._
import scalaz.concurrent.Task
import scalaz.stream.time._
import scalaz.stream._
import scalaz.stream.Process.Env
import scalaz.stream.ReceiveY._
假設我們有能夠提取其快照的必要數據源。爲了演示,它還更新本身每提取:
trait DataSource[Key, Value] {
def loadMap: Map[Key, Value]
}
object DataSourceStub extends DataSource[Int, String] {
private var externalSource: Map[Int, String] = Map(1 -> "a")
def loadMap: Map[Int, String] = {
val snapshot = externalSource
val key = snapshot.keys.max
val value = snapshot(key)
val (newKey, newValue) = (key + 1) -> (value + "a")
val newSource = snapshot + (newKey -> newValue)
externalSource = newSource
snapshot
}
}
現在,我們通過引入timer
上開始它會立即發出單元事件開始Loader
執行而發出的每refreshEvery
秒。然後,報告我們cacheStates
的事件流可以通過將數據接收Task
映射到每個事件並在流內對它們進行評估來獲得。困難的部分是換向:我們需要使用我們的定期快照流來交織一系列的流(使用緩存中的數據執行某些操作的函數)。 scalaz-stream
提供了一個流換向工具Wye
,它允許我們說明我們將從哪個順序處理來自輸入流的事件。我們需要一個初始高速緩存快照來處理,所以我們從wye.receiveL
開始,移動到handleImpl
,初始高速緩存狀態。現在,我們可以receiveBoth
接收任何事件:
- 如果它是一個高速緩存快照更新,我們重現它不產生輸出;
- 如果它是一個請求,我們給它當前的緩存狀態,並將生成的
Task
發送到輸出中,同時在當前狀態下重複;
- 如果其中一個輸入流終止,我們停止處理。
剩下的唯一一件事就是帶着handle
Wye
,包括副作用的處理任務分成流,這是我們在做processRequests
加入我們的投入。
class Loader[Key, Value](dataSource: DataSource[Key, Value], refreshEvery: Duration) {
type CacheState = Map[Key, Value]
type Request = CacheState => Task[Unit]
type ReaderEnv = Env[CacheState, Request]
implicit val scheduler: ScheduledExecutorService = DefaultScheduler
private val timer: Process[Task, Unit] =
Process.emit(()) ++ awakeEvery(refreshEvery).map(_ =>())
private val cacheStates: Process[Task, CacheState] =
timer.evalMap(_ => Task(dataSource.loadMap))
private val handle: Wye[CacheState, Request, Task[Unit]] = {
def handleImpl(current: CacheState): Wye[CacheState, Request, Task[Unit]] = {
import wye._
import Process._
receiveBoth {
case ReceiveL(i) => handleImpl(i)
case ReceiveR(i) => emit(i(current)) ++ handleImpl(current)
case HaltOne(rsn) => Halt(rsn)
}
}
wye.receiveL[CacheState, Request, Task[Unit]](handleImpl)
}
def processRequests(requests: Process[Task, Request]): Process[Task, Unit] =
cacheStates.wye(requests)(handle).eval
}
讓我們通過最大ID發佈數據的100個請求測試我們的數據加載器(每100毫秒),它在執行每秒刷新一次:
object TestStreamBatching {
private val loader = new Loader(DataSourceStub, 1.second)
private def request(cache: loader.CacheState): Task[Unit] = Task {
Thread.sleep(100)
val key = cache.keys.max
val value = cache(key)
println(value)
}
private val requests: Process[Task, loader.Request] =
Process.unfold(100)(s => if(s > 0) Some((request, s - 1)) else None)
def main(args: Array[String]): Unit = {
loader.processRequests(requests).run.unsafePerformSync
}
}
通過運行它,你可以看到一個梯子'a'
字母每秒增加其立管尺寸,最終在100個輸出後終止。
如果你使用(volatile)'var','Map'不一定是可變的(在上面的代碼中似乎是這種情況) –
@BrunoGrieder是的,Map不一定是可變的,而事實並非如此。問題是,我們可以在不使用可變映射的情況下以某種方式擺脫這個變種。我編輯我的問題更清楚。 –
如果你想改變狀態並保持它,某些東西必須是可變的。也許你可以通過在Monad Transformer中封裝狀態並「攜帶」狀態而逃脫,但我甚至無法想象這將如何。 ScalaZ專家可能會跳入此處 –