我必須並行運行多個期貨,並且程序不應該崩潰或掛起。並行運行多個期貨,超時返回默認值
現在我要一個接一個地等待期貨,如果有TimeoutException,則使用回退值。
val future1 = // start future1
val future2 = // start future2
val future3 = // start future3
// <- at this point all 3 futures are running
// waits for maximum of timeout1 seconds
val res1 = toFallback(future1, timeout1, Map[String, Int]())
// .. timeout2 seconds
val res2 = toFallback(future2, timeout2, List[Int]())
// ... timeout3 seconds
val res3 = toFallback(future3, timeout3, Map[String, BigInt]())
def toFallback[T](f: Future[T], to: Int, default: T) = {
Try(Await.result(f, to seconds))
.recover { case to: TimeoutException => default }
}
正如我所看到的,這個片段的最長等待時間爲timeout1 + timeout2 + timeout3
我的問題是:我怎麼能等待所有這些期貨的一次,所以等待時間我可以減少max(timeout1, timeout2, timeout3)
?
編輯:最後,我用@Jatin和@senia答案修改:
private def composeWaitingFuture[T](fut: Future[T],
timeout: Int, default: T) =
future { Await.result(fut, timeout seconds) } recover {
case e: Exception => default
}
,後來它的使用方法如下:
// starts futures immediately and waits for maximum of timeoutX seconds
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]())
val res2 = composeWaitingFuture(future2, timeout2, List[Int]())
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]())
// takes the maximum of max(timeout1, timeout2, timeout3) to complete
val combinedFuture =
for {
r1 <- res1
r2 <- res2
r3 <- res3
} yield (r1, r2, r3)
,後來我用combinedFuture
,因爲我認爲合適。
我不明白的是,它是如何'TIMEOUT1 + timeout2 + timeout3'? future1的'timeout1',future2的timeout2等等。問題仍然不清楚 – Jatin
他想要並行運行3個任務,以便超時是三個任務超時的最大值 –
我認爲我回答這個問題的答案類似於你想要的,它也是利用非阻塞回調。 http://stackoverflow.com/questions/16304471/scala-futures-built-in-timeout/16305056#16305056 – cmbaxter