2014-10-29 101 views
1

我試圖找出順序執行一系列期貨的最好方法,其中一個未來的執行取決於前一個。我試圖爲任意數量的期貨做這件事。斯卡拉 - 依次執行任意數量的期貨但依賴於

用戶案例:

  • 我從數據庫中檢索了一些IDS的。
  • 我現在需要檢索Web服務上的一些相關數據。
  • 我想要停止,一旦我找到了有效的結果。
  • 我只關心成功的結果。

並行執行這些操作,然後解析返回的結果集合不是一個選項。我必須一次執行一個請求,並且只有在前一個請求沒有返回結果時才執行下一個請求。

目前的解決方案是沿着這些路線。使用foldLeft執行請求,然後僅在前一個未來滿足某些條件時評估未來的未來。

def dblFuture(i: Int) = { i * 2 } 
val list = List(1,2,3,4,5) 
val future = list.foldLeft(Future(0)) { 
    (previousFuture, next) => { 
    for { 
     previousResult <- previousFuture 
     nextFuture <- { if (previousResult <= 4) dblFuture(next) else previousFuture } 
    } yield (nextFuture) 
    } 
} 

這樣做的很大的弊端是:a)我一直在處理所有的項目甚至有一次我有一個結果,我很高興和b)一旦我找到了結果我之後,我一直在評估謂詞。在這種情況下,這很簡單,但實際上它可能更復雜。

我覺得我錯過了一個更優雅的解決方案。

+0

我對用例困惑,因爲它似乎沒有就像它具有您所描述的那種數據流依賴性(「Future的執行取決於前一個」),因爲如果前一個Future的結果是* empty *,那麼您只執行下一個Future。我錯過了什麼?也就是說,未來的未來除了僅僅決定它是否執行之外,還會以任何方式依賴於以前的結果? – 2014-10-29 03:08:39

+0

這與http:// stackoverflow非常相似。com/questions/26438991/is-there-sequential-future-find/26439838#26439838(請參閱我的答案)和http://stackoverflow.com/questions/26349318/how-to-invoke-a-method-again-並且直到它返回一個未來值包含 – 2014-10-29 06:04:06

+0

@ChrisMartin下一個期貨執行不僅取決於前一個Future是否成功,還取決於響應。例如:如果以前的未來包含狀態爲404的WSResponse,則執行下一個未來,否則不執行。 – healsjnr 2014-10-29 23:29:46

回答

5

看看你的例子,好像前面的結果對後續結果沒有影響,而只是重要的是前一個結果滿足一些條件以防止計算下一個結果。如果是這樣的話,這裏是使用filterrecoverWith的遞歸解決方案。

def untilFirstSuccess[A, B](f: A => Future[B])(condition: B => Boolean)(list: List[A]): Future[B] = { 
    list match { 
     case head :: tail => f(head).filter(condition).recoverWith { case _: Throwable => untilFirstSuccess(f)(condition)(tail) } 
     case Nil => Future.failed(new Exception("All failed..")) 
    } 
} 
Future完成

filter纔會被調用,如果Future失敗recoverWith纔會被調用。

def dblFuture(i: Int): Future[Int] = Future { 
    println("Executing.. " + i) 
    i * 2 
} 

val list = List(1, 2, 3, 4, 5) 

scala> untilFirstSuccess(dblFuture)(_ > 6)(list) 
Executing.. 1 
Executing.. 2 
Executing.. 3 
Executing.. 4 
res1: scala.concurrent.Future[Int] = [email protected] 

scala> res1.value 
res2: Option[scala.util.Try[Int]] = Some(Success(8)) 
+0

非常感謝! – Ikrom 2016-08-02 10:24:59

2

最巧妙的方法,而「真正的函數式編程」是scalaz流;)但是你需要從未來階切換到scalaz.concurrent.Task爲抽象的「未來的結果」。這有點不同。任務是純粹的,Future是「運行計算」,但他們有很多共同之處。

import scalaz.concurrent.Task 
    import scalaz.stream.Process 

    def dblTask(i: Int) = Task { 
    println(s"Executing task $i") 
    i * 2 
    } 

    val list = Seq(1,2,3,4,5) 

    val p: Process[Task, Int] = Process.emitAll(list) 

    val result: Task[Option[Int]] = 
    p.flatMap(i => Process.eval(dblTask(i))).takeWhile(_ < 10).runLast 

    println(s"result = ${result.run}") 

結果:

Executing task 1 
Executing task 2 
Executing task 3 
Executing task 4 
Executing task 5 
result = Some(8) 

,如果你的計算已經斯卡拉未來,你可以把它轉換爲任務

implicit class Transformer[+T](fut: => SFuture[T]) { 
    def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = { 
    import scala.util.{Failure, Success} 
    import scalaz.syntax.either._ 
    Task.async { 
     register => 
     fut.onComplete { 
      case Success(v) => register(v.right) 
      case Failure(ex) => register(ex.left) 
     } 
    } 
    } 
} 
+0

感謝尤金,我很喜歡這個解決方案,但我目前沒有使用ScalaZ,希望能夠使用標準Scala未來的解決方案(因此我將@LimbSoup標記爲正確答案)。儘管如此,請詳細看看SacalZ。 – healsjnr 2014-10-29 23:33:23