2017-02-27 40 views
0

我想定期從一些源提取數據,每小時一次。我這樣做,因爲數據獲取花費了大量的時間,大約10分鐘。所以,我緩存這些數據。在不帶var/mutable集合的Scala中定期提取數據的方式

我有這樣的代碼現在:

import java.util._ 

object Loader { 
    @volatile private var map: Map[SomeKey, SomeValue] = Map() 

    def start() { 
     val timer = new Timer() 
     val timerTask = new TimerTask { 
      override def run() { 
       reload() 
      } 
     } 
     val oneHour = 60 * 60 * 1000 
     timer.schedule(timerTask, oneHour) 
    } 

    def reload() { 
     map = loadMap() 
    } 

    // this method invocation costs a lot, so, I cache it in reload() 
    def loadMap(): Map[SomeKey, SomeValue] = ... 

    def getValue(key: SomeKey): Option[SomeValue] = map.get(key) 
} 

而且,我有我的main()功能Loader.start()調用。

這很好,但我想知道,有沒有辦法以更多功能的方式編寫它:不使用可變集合來擺脫var?

+0

如果你使用(volatile)'var','Map'不一定是可變的(在上面的代碼中似乎是這種情況) –

+0

@BrunoGrieder是的,Map不一定是可變的,而事實並非如此。問題是,我們可以在不使用可變映射的情況下以某種方式擺脫這個變種。我編輯我的問題更清楚。 –

+0

如果你想改變狀態並保持它,某些東西必須是可變的。也許你可以通過在Monad Transformer中封裝狀態並「攜帶」狀態而逃脫,但我甚至無法想象這將如何。 ScalaZ專家可能會跳入此處 –

回答

2

在這種使用情況下,組合IO庫scalaz-stream成功地在您的代碼中保持可變性。首先,依賴關係:

libraryDependencies ++= Seq(
    "org.scalaz" %% "scalaz-core" % "7.2.8", 
    "org.scalaz" %% "scalaz-concurrent" % "7.2.8", 
    "org.scalaz.stream" %% "scalaz-stream" % "0.8.6a" 
) 

我們先從scalaz-concurrentscalaz-stream部分進口:

import java.util.concurrent.ScheduledExecutorService 
import scala.concurrent.duration._ 
import scalaz.concurrent.Task 
import scalaz.stream.time._ 
import scalaz.stream._ 
import scalaz.stream.Process.Env 
import scalaz.stream.ReceiveY._ 

假設我們有能夠提取其快照的必要數據源。爲了演示,它還更新本身每提取:

trait DataSource[Key, Value] { 

    def loadMap: Map[Key, Value] 
} 

object DataSourceStub extends DataSource[Int, String] { 

    private var externalSource: Map[Int, String] = Map(1 -> "a") 

    def loadMap: Map[Int, String] = { 
    val snapshot = externalSource 
    val key = snapshot.keys.max 
    val value = snapshot(key) 
    val (newKey, newValue) = (key + 1) -> (value + "a") 
    val newSource = snapshot + (newKey -> newValue) 
    externalSource = newSource 
    snapshot 
    } 
} 

現在,我們通過引入timer上開始它會立即發出單元事件開始Loader執行而發出的每refreshEvery秒。然後,報告我們cacheStates的事件流可以通過將數據接收Task映射到每個事件並在流內對它們進行評估來獲得。困難的部分是換向:我們需要使用我們的定期快照流來交織一系列的流(使用緩存中的數據執行某些操作的函數)。 scalaz-stream提供了一個流換向工具Wye,它允許我們說明我們將從哪個順序處理來自輸入流的事件。我們需要一個初始高速緩存快照來處理,所以我們從wye.receiveL開始,移動到handleImpl,初始高速緩存狀態。現在,我們可以receiveBoth接收任何事件:

  • 如果它是一個高速緩存快照更新,我們重現它不產生輸出;
  • 如果它是一個請求,我們給它當前的緩存狀態,並將生成的Task發送到輸出中,同時在當前狀態下重複;
  • 如果其中一個輸入流終止,我們停止處理。

剩下的唯一一件事就是帶着handleWye,包括副作用的處理任務分成流,這是我們在做processRequests加入我們的投入。

class Loader[Key, Value](dataSource: DataSource[Key, Value], refreshEvery: Duration) { 

    type CacheState = Map[Key, Value] 
    type Request = CacheState => Task[Unit] 
    type ReaderEnv = Env[CacheState, Request] 

    implicit val scheduler: ScheduledExecutorService = DefaultScheduler 

    private val timer: Process[Task, Unit] = 
    Process.emit(()) ++ awakeEvery(refreshEvery).map(_ =>()) 

    private val cacheStates: Process[Task, CacheState] = 
    timer.evalMap(_ => Task(dataSource.loadMap)) 

    private val handle: Wye[CacheState, Request, Task[Unit]] = { 
    def handleImpl(current: CacheState): Wye[CacheState, Request, Task[Unit]] = { 
     import wye._ 
     import Process._ 
     receiveBoth { 
     case ReceiveL(i) => handleImpl(i) 
     case ReceiveR(i) => emit(i(current)) ++ handleImpl(current) 
     case HaltOne(rsn) => Halt(rsn) 
     } 
    } 
    wye.receiveL[CacheState, Request, Task[Unit]](handleImpl) 
    } 

    def processRequests(requests: Process[Task, Request]): Process[Task, Unit] = 
    cacheStates.wye(requests)(handle).eval 
} 

讓我們通過最大ID發佈數據的100個請求測試我們的數據加載器(每100毫秒),它在執行每秒刷新一次:

object TestStreamBatching { 

    private val loader = new Loader(DataSourceStub, 1.second) 

    private def request(cache: loader.CacheState): Task[Unit] = Task { 
    Thread.sleep(100) 
    val key = cache.keys.max 
    val value = cache(key) 
    println(value) 
    } 

    private val requests: Process[Task, loader.Request] = 
    Process.unfold(100)(s => if(s > 0) Some((request, s - 1)) else None) 

    def main(args: Array[String]): Unit = { 
    loader.processRequests(requests).run.unsafePerformSync 
    } 
} 

通過運行它,你可以看到一個梯子'a'字母每秒增加其立管尺寸,最終在100個輸出後終止。

+0

順便說一下,在這種情況下,我們也可以使用阿卡流:) –

+0

但是,請注意,阿卡流雖然是背壓流的良好實施,但它是一種必不可少的設計,它不會保留參考透明度,甚至不必這樣的概念。當用它實現函數式編程結構時,你需要仔細觀察它是否會在看似無效的調用(如Scala的立即啓動提供函數的'Future'構造函數)中做任何事情。 –

相關問題