2017-05-30 37 views
1

我再次嘗試更新一些預播放2.5代碼(基於此vid)。例如,以下以前是如何流式未來Stream Play in Play 2.5

Ok.chunked(Enumerator.generateM(Promise.timeout(Some("hello"), 500))) 

我已經創建了下面的方法用於使用阿卡的變通爲Promise.timeout(廢棄):

private def keepResponding(data: String, delay: FiniteDuration, interval: FiniteDuration): Future[Result] = { 
    val promise: Promise[Result] = Promise[Result]() 
    actorSystem.scheduler.schedule(delay, interval) { promise.success(Ok(data)) } 
    promise.future 
    } 

根據Play Framework Migration Guide; Enumerators改寫爲源Source.unfoldAsync明顯的Enumerator.generateM相當於所以我希望這會工作(其中strFuture[String]):

def inf = Action { request => 
    val str = keepResponding("stream me", 1.second, 2.second) 

    Ok.chunked(Source.unfoldAsync(str)) 
    } 

當然,我得到一個類型不匹配錯誤,望着unfoldAsync的情況下,類簽名時:

final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) 

我可以看到,參數不正確的,但我不完全underst以及我應該怎樣通過這個過程。

回答

1

unfoldAsync甚至比玩更通用的!自己generateM,因爲它可以讓你通過一個狀態(S)值。這可以使發射的值取決於先前發射的值。

下面的例子,直到加載失敗會受到越來越ID載荷值,:

val source: Source[String, NotUsed] = Source.unfoldAsync(0){ id ⇒ 
    loadFromId(id) 
    .map(s ⇒ Some((id + 1, s))) 
    .recover{case _ ⇒ None} 
} 

def loadFromId(id: Int): Future[String] = ??? 

在你的情況是不是真的需要一個內部狀態,因此在需要時可以只通過虛擬值,例如

val source: Source[Result, NotUsed] = Source.unfoldAsync(NotUsed) { _ ⇒ 
    schedule("stream me", 2.seconds).map(x ⇒ Some(NotUsed → x)) 
} 

def schedule(data: String, delay: FiniteDuration): Future[Result] = { 
    akka.pattern.after(delay, system.scheduler){Future.successful(Ok(data))} 
} 

請注意,原來實行的keepResponding不正確,因爲你不能完成Promise不止一次。阿卡after模式提供了一種更簡單的方式來實現你所需要的。

但是請注意,在特定情況下,阿卡流提供帶Source.tick一個更地道的解決方案:如果

val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, NotUsed).mapAsync(1){ _ ⇒ 
    loadSomeFuture() 
} 

def loadSomeFuture(): Future[String] = ??? 

或者更簡單,你實際上並不需要異步計算在你的榜樣

val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, "stream me") 
+0

謝謝。你的第三和第四個例子的工作,但這兩個都需要至少幾分鐘輸出第一個結果,感覺應該不會發生。有什麼辦法可以加速你想到的嗎?也出於某種原因,我無法得到第一個例子的工作 - 一個***前瞻性參考的錯誤擴展了價值源***的定義。謝謝 –