2013-07-04 47 views
12

我必須並行運行多個期貨,並且程序不應該崩潰或掛起。並行運行多個期貨,超時返回默認值

現在我要一個接一個地等待期貨,如果有TimeoutException,則使用回退值。

val future1 = // start future1 
val future2 = // start future2 
val future3 = // start future3 

// <- at this point all 3 futures are running 

// waits for maximum of timeout1 seconds 
val res1 = toFallback(future1, timeout1, Map[String, Int]()) 
// .. timeout2 seconds 
val res2 = toFallback(future2, timeout2, List[Int]()) 
// ... timeout3 seconds 
val res3 = toFallback(future3, timeout3, Map[String, BigInt]()) 

def toFallback[T](f: Future[T], to: Int, default: T) = { 
    Try(Await.result(f, to seconds)) 
    .recover { case to: TimeoutException => default } 
} 

正如我所看到的,這個片段的最長等待時間爲timeout1 + timeout2 + timeout3

我的問題是:我怎麼能等待所有這些期貨的一次,所以等待時間我可以減少max(timeout1, timeout2, timeout3)

編輯:最後,我用@Jatin和@senia答案修改:

private def composeWaitingFuture[T](fut: Future[T], 
            timeout: Int, default: T) = 
    future { Await.result(fut, timeout seconds) } recover { 
    case e: Exception => default 
    } 

,後來它的使用方法如下:

// starts futures immediately and waits for maximum of timeoutX seconds 
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]()) 
val res2 = composeWaitingFuture(future2, timeout2, List[Int]()) 
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]()) 

// takes the maximum of max(timeout1, timeout2, timeout3) to complete 
val combinedFuture = 
    for { 
    r1 <- res1 
    r2 <- res2 
    r3 <- res3 
    } yield (r1, r2, r3) 

,後來我用combinedFuture,因爲我認爲合適。

+0

我不明白的是,它是如何'TIMEOUT1 + timeout2 + timeout3'? future1的'timeout1',future2的timeout2等等。問題仍然不清楚 – Jatin

+1

他想要並行運行3個任務,以便超時是三個任務超時的最大值 –

+1

我認爲我回答這個問題的答案類似於你想要的,它也是利用非阻塞回調。 http://stackoverflow.com/questions/16304471/scala-futures-built-in-timeout/16305056#16305056 – cmbaxter

回答

8
def toFallback[T](f: Future[T], to: Int, default: T) = { 
    future{ 
    try{ 
     Await.result(f, to seconds) 
    }catch{ 
     case e:TimeoutException => default 
    } 
} 

您甚至可以使該塊異步,並且每個請求都會等待其最長時間。如果線程太多,可能會有一個線程使用Akka的system scheduler繼續檢查其他期貨。 @Senia在下面回答了這個問題。

+5

'Await.result'阻塞線程,所以你不應該在這裏使用默認的'ExecutionContext'。您可以爲'toFallback'的調用創建一個特殊的'ExecutionContext',甚至可以像[本答案](http://stackoverflow.com/a/17215663/406435)中那樣啓動一個新線程而不是'future'方法。 – senia

12

您可以創建future返回使用flatMap或所有3和期貨的結果-理解:

val combinedFuture = 
    for { 
    r1 <- future1 
    r2 <- future2 
    r3 <- future3 
    } yield (r1, r2, r3) 

val (r1, r2, r3) = Await.result(combinedFuture , Seq(timeout1, timeout2, timeout3).max) 

如果您正在使用akka你可以超時後完成的默認值,你的未來:

implicit class FutureHelper[T](f: Future[T]) extends AnyVal{ 
    import akka.pattern.after 
    def orDefault(t: Timeout, default: => T)(implicit system: ActorSystem): Future[T] = { 
    val delayed = after(t.duration, system.scheduler)(Future.successful(default)) 
    Future firstCompletedOf Seq(f, delayed) 
    } 
} 

val combinedFuture = 
    for { 
    r1 <- future1.orDefault(timeout1, Map()) 
    r2 <- future2.orDefault(timeout2, List()) 
    r3 <- future3.orDefault(timeout3, Map()) 
    } yield (r1, r2, r3) 

val (r1, r2, r3) = Await.result(combinedFuture , allowance + Seq(timeout1, timeout2, timeout3).max) 
+0

這有一個缺陷。說'future1'花了很長時間,但另一個期貨已經完成了,你不會得到任何輸出。 'future2'和'future3'輸出將毫無用處。 – Jatin

+1

@Jatin:你可以用'akka'中的默認值aftre timeout完成你的未來。查看更新。 – senia

2

我會避免使用Await.result,因爲它只是爲了阻塞而使用線程。一個選項來實現超時期貨會是這樣:

val timer = new Timer() 

def toFallback[T](f: Future[T], timeout: Int, default: T) = { 
    val p = Promise[T]() 
    f.onComplete(result => p.tryComplete(result)) 
    timer.schedule(new TimerTask { 
    def run() { 
     p.tryComplete(Success(default)) 
    } 
    }, timeout) 
    p.future 
} 

這將創建要麼由未來或由指定的超時後的默認結果完成一個承諾 - 以先到者爲準。

要運行查詢,同時你會做像這樣:

val future1 = // start future1 
val future2 = // start future2 
val future3 = // start future3 

val res1 = toFallback(future1, timeout1, Map[String, Int]()) 
val res2 = toFallback(future2, timeout2, List[Int]()) 
val res3 = toFallback(future3, timeout3, Map[String, BigInt]()) 

val resultF = for { 
    r1 <- res1 
    r2 <- res2 
    r3 <- res3 
} yield (r1, r2, r3) 

val (r1, r2, r3) = Await.result(resultF, Duration.Inf) 
println(s"$r1, $r2, $r3") 

//or 
resultF.onSuccess { 
    case (r1, r2, r3) => println(s"$r1, $r2, $r3") 
} 
0

爲什麼拿不到Future本身進行異常捕獲和默認的回報?然後,您可以在每個未來依次簡單地使用Await,並且您不必擔心未來的異常處理問題。

0

這可能有點不方便,但您可以簡單地測量已用時間並相應地修改超時。假設timeout1 <= timeout2 <= timeout3

def now  = System.currentTimeMillis(); 
val start = now; 
def remains(timeout: Long): Long 
      = math.max(0, timeout + start - now) 

def toFallback[T](f: Future[T], to: Int, default: T) = { 
    Try(Await.result(f, remains(to) seconds)) 
    .recover { case to: TimeoutException => default } 
} 

這樣每個超時是基於對當下start = now叫,所以整體的運行時間爲至多timeout3。如果超時沒有排除,它仍然有效,但某些任務可能會超過其指定的超時運行時間。

2

這是一個較長的(unakka)答案,它解決了可能的用例,也就是說,如果其中一個值「超時」,您希望使用該結果的默認值並對其執行某些操作(例如取消長時間計算或I/O或其他)。

不用說,另一個故事是儘量減少阻塞。

基本的想法是坐在一個等待firstCompletedOf項目尚未完成的循環。 ready上的超時是最小剩餘超時時間。

此代碼使用期限而不是持續時間,但使用持續時間作爲「剩餘時間」很容易。

import scala.language.postfixOps 
import scala.concurrent._ 
import scala.concurrent.duration._ 
import ExecutionContext.Implicits._ 
import scala.reflect._ 
import scala.util._ 
import java.lang.System.{ nanoTime => now } 

import Test.time 

class Test { 

    type WorkUnit[A] = (Promise[A], Future[A], Deadline, A) 
    type WorkQ[A] = Seq[WorkUnit[A]] 

    def await[A: ClassTag](work: Seq[(Future[A], Deadline, A)]): Seq[A] = { 
    // check for timeout; if using Duration instead of Deadline, decrement here 
    def ticktock(w: WorkUnit[A]): WorkUnit[A] = w match { 
     case (p, f, t, v) if !p.isCompleted && t.isOverdue => p trySuccess v ; w 
     case _ => w 
    } 
    def await0(work: WorkQ[A]): WorkQ[A] = { 
     val live = work filterNot (_._1.isCompleted) 
     val t0 = (live map (_._3)).min 
     Console println s"Next deadline in ${t0.timeLeft.toMillis}" 
     val f0 = Future firstCompletedOf (live map (_._2)) 
     Try(Await ready (f0, t0.timeLeft)) 
     val next = work map (w => ticktock(w)) 
     if (next exists (!_._1.isCompleted)) { 
     await0(next) 
     } else { 
     next 
     } 
    } 
    val wq = work map (_ match { 
     case (f, t, v) => 
     val p = Promise[A] 
     p.future onComplete (x => Console println s"Value available: $x: $time") 
     f onSuccess { 
      case a: A => p trySuccess a // doesn't match on primitive A 
      case x => p trySuccess x.asInstanceOf[A] 
     } 
     f onFailure { case _ => p trySuccess v } 
     (p, f, t, v) 
    }) 
    await0(wq) map (_ match { 
     case (p, f, t, v) => p.future.value.get.get 
    }) 
    } 
} 

object Test { 
    val start = now 
    def time = s"The time is ${ Duration fromNanos (now - start) toMillis }" 

    def main(args: Array[String]): Unit = { 
    // #2 times out 
    def calc(i: Int) = { 
     val t = if (args.nonEmpty && i == 2) 6 else i 
     Thread sleep t * 1000L 
     Console println s"Calculate $i: $time" 
     i 
    } 
    // futures to be completed by a timeout deadline 
    // or else use default and let other work happen 
    val work = List(
     (future(calc(1)), 3 seconds fromNow, 10), 
     (future(calc(2)), 5 seconds fromNow, 20), 
     (future(calc(3)), 7 seconds fromNow, 30) 
    ) 
    Console println new Test().await(work) 
    } 
} 

採樣運行:

[email protected]:~/tmp$ skalac nextcompleted.scala ; skala nextcompleted.Test 
Next deadline in 2992 
Calculate 1: The time is 1009 
Value available: Success(1): The time is 1012 
Next deadline in 4005 
Calculate 2: The time is 2019 
Value available: Success(2): The time is 2020 
Next deadline in 4999 
Calculate 3: The time is 3020 
Value available: Success(3): The time is 3020 
List(1, 2, 3) 
[email protected]:~/tmp$ skala nextcompleted.Test arg 
Next deadline in 2992 
Calculate 1: The time is 1009 
Value available: Success(1): The time is 1012 
Next deadline in 4005 
Calculate 3: The time is 3020 
Value available: Success(3): The time is 3020 
Next deadline in 1998 
Value available: Success(20): The time is 5020 
List(1, 20, 3)