2016-12-06 29 views
1

假設我有以下代碼:Scala Future和TimeoutException:如何知道根本原因?

val futureInt1 = getIntAsync1(); 
val futureInt2 = getIntAsync2(); 

val futureSum = for { 
    int1 <- futureInt1 
    int2 <- futureInt2 
} yield (int1 + int2) 

val sum = Await.result(futureSum, 60 seconds) 

現在假設的getIntAsync1getIntAsync2一個需要很長的時間,它會導致Await.result拋出異常:

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60 seconds] 

我怎麼知道哪一個getIntAsync1getIntAsync2仍然未決,實際上導致超時?

請注意,我在這裏匯合2個期貨拉鍊,這是問題的一個簡單的例子,但在我的應用我有這種代碼在不同的水平(即getIntAsync1本身可以使用Future.zipFuture.sequence,地圖/ flatMap/applicative)

不知何故,當我的主線程發生超時時,我能夠記錄掛起的併發操作堆棧跟蹤,以便我可以知道我的系統上的瓶頸在哪裏。


我有一個現有的遺留API後端,這是不完全反應,但不會這麼快。我試圖通過使用併發來增加響應時間。但是,自從使用這種代碼之後,理解爲什麼在我的應用程序中需要花費很多時間纔會變得更加痛苦。如果您能提供幫助我調試此類問題的任何提示,我將不勝感激。

+0

什麼是結果? –

+0

我認爲它應該是'futureSum' –

+0

什麼時候既不是int1也不是int2,而是通過收益創造的新未來 – Lambder

回答

1

的關鍵是實現是一個未來不會在你的時間了例子,這是你的調用線程暫停其在大多數x時間。

因此,如果您想在您的期貨中建立時間,您應該在每個分行上使用zipWith,並在Future中使用zip,並在一定時間內包含一個值。如果你使用阿卡,那麼你可以使用akka.pattern.after來做這件事,以及Future.firstCompletedOf。

現在,即使你這樣做,你如何計算出爲什麼你的期貨沒有及時完成,也許他們依賴於其他未完成的期貨。

問題歸結爲:你是否試圖對吞吐量做一些根本原因分析?那麼你應該監視你的ExecutionContext,而不是你的期貨。期貨只是價值。

+0

我會可以在爭用發生時進行線程轉儲,有沒有辦法在適當的時候觸發線程轉儲?就像達到歐共體限制時一樣? –

+0

如果您編寫自定義EC,您絕對可以這麼做。 –

+0

嗯以前從來沒有做過,但可能會嘗試是的! –

0

您可以檢查是否有未來已經通過調用isComplete方法

if (futureInt1.isComplete) { /*futureInt2 must be the culprit */ } 
if (futureInt2.isComplete) { /*futureInt1 must be the culprit */ } 
+0

除了當我們執行這兩行時,兩個期貨都可以作爲代碼完成並且以前的等待操作不是原子的 – Lambder

+0

是真的。另一種選擇是等待每個單獨未來的結果:val futureInt1 =未來{Await.result(getIntAsync1(),60秒)}並使用理解這些期貨來代替。 –

+0

這會稍微改變語義,因爲我們不再等待60個語義。 – Lambder

0

作爲第一種方法,我建議解除您的未來[INT]爲未來完成[嘗試[INT]]。類似的東西:

object impl { 

    def checkException[T](in: Future[T]): Future[Try[T]] = 
    in.map(Success(_)).recover { 
     case e: Throwable => { 
     Failure(new Exception("Error in future: " + in)) 
     } 
    } 

    implicit class FutureCheck(s: Future[Int]) { 
    def check = checkException(s) 
    } 
} 

然後一個小功能的結果結合起來,這樣的事情:

object test { 

    import impl._ 

    val futureInt1 = Future{ 1 } 
    val futureInt2 = Future{ 2 } 

    def combine(a: Try[Int], b: Try[Int])(f: (Int, Int) => (Int)) : Try[Int] = { 
    if(a.isSuccess && b.isSuccess) { 
     Success(f(a.get, b.get)) 
    } 
    else 
    Failure(new Exception("Error adding results")) 
    } 

    val futureSum = for { 
    int1 <- futureInt1.check 
    int2 <- futureInt2.check 
    } yield combine(int1, int2)(_ + _) 
} 

在futureSum你將有一個嘗試[INT]與整數或與異常的故障與可能的錯誤相對應。

也許這可以是有用的

+0

這個例子是未來故障處理的一個很好的例子。但我不認爲超時是未來的失敗,所以不幸的是,它並沒有回答原來的問題。 – Lambder

+0

是的,我不明白它是如何回答我的問題 –

0
val futureInt1 = getIntAsync1(); 
val futureInt2 = getIntAsync2(); 

val futureSum = for { 
    int1 <- futureInt1 
    int2 <- futureInt2 
} yield (int1 + int2) 

Try(Await.result(futureSum, 60 seconds)) match { 
    case Success(sum) => println(sum) 
    case Failure(e) => println("we got timeout. the unfinished futures are: " + List(futureInt1, futureInt2).filter(!_.isCompleted) 
} 
+0

如果futureInt1已經是其他期貨的組成部分,該怎麼辦?我想知道哪些可能深層嵌套的未來需要很長時間,而不僅僅是在最高層次的未來構成 –

1

提出的解決方案從塊進入TimelyFuture這需要超時而名稱包裹每前途。它在內部使用「等待」來檢測單個超時。 請注意,使用期貨的這種風格不適用於生產代碼,因爲它使用阻塞。僅用於診斷才能確定哪些期貨需要花費時間才能完成。

package xxx 

import java.util.concurrent.TimeoutException 

import scala.concurrent.{Future, _} 
import scala.concurrent.duration.Duration 
import scala.util._ 
import scala.concurrent.duration._ 

class TimelyFuture[T](f: Future[T], name: String, duration: Duration) extends Future[T] { 

    override def onComplete[U](ff: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = f.onComplete(x => ff(x)) 

    override def isCompleted: Boolean = f.isCompleted 

    override def value: Option[Try[T]] = f.value 

    @scala.throws[InterruptedException](classOf[InterruptedException]) 
    @scala.throws[TimeoutException](classOf[TimeoutException]) 
    override def ready(atMost: Duration)(implicit permit: CanAwait): TimelyFuture.this.type = { 
    Try(f.ready(atMost)(permit)) match { 
     case Success(v) => this 
     case Failure(e) => this 
    } 
    } 

    @scala.throws[Exception](classOf[Exception]) 
    override def result(atMost: Duration)(implicit permit: CanAwait): T = { 
    f.result(atMost) 
    } 

    override def transform[S](ff: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] = { 
    val p = Promise[S]() 
    Try(Await.result(f, duration)) match { 
     case [email protected](_) => ff(s) match { 
     case Success(v) => p.success(v) 
     case Failure(e) => p.failure(e) 
     } 
     case Failure(e) => e match { 
     case e: TimeoutException => p.failure(new RuntimeException(s"future ${name} has timed out after ${duration}")) 
     case _ => p.failure(e) 
     } 
    } 
    p.future 
    } 

    override def transformWith[S](ff: (Try[T]) => Future[S])(implicit executor: ExecutionContext): Future[S] = { 
    val p = Promise[S]() 
    Try(Await.result(f, duration)) match { 
     case [email protected](_) => ff(s).onComplete({ 
     case Success(v) => p.success(v) 
     case Failure(e) => p.failure(e) 
     }) 
     case Failure(e) => e match { 
     case e: TimeoutException => p.failure(new RuntimeException(s"future ${name} has timed out after ${duration}")) 
     case _ => p.failure(e) 
     } 
    } 
    p.future 
    } 
} 

object Main { 

    import scala.concurrent.ExecutionContext.Implicits.global 

    def main(args: Array[String]): Unit = { 
    val f = Future { 
     Thread.sleep(5); 
     1 
    } 

    val g = Future { 
     Thread.sleep(2000); 
     2 
    } 

    val result: Future[(Int, Int)] = for { 
     v1 <- new TimelyFuture(f, "f", 10 milliseconds) 
     v2 <- new TimelyFuture(g, "g", 10 milliseconds) 
    } yield (v1, v2) 


    val sum = Await.result(result, 1 seconds) // as expected, this throws exception : "RuntimeException: future g has timed out after 10 milliseconds" 
    } 
} 
+0

,這似乎也是一級未來組合的有效解決方案,但不會真正處理未來組合層次的深度嵌套水平。 –

+0

@SebastienLorber更新到更簡單的解決方案 – Lambder

+0

嗯,從用戶的角度來看似乎更簡單,可以完成這項工作。但是,我看到一些限制。如果我將TimelyFuture等待時間增加到1分鐘,並且全局總和只等待1秒,我仍然不知道什麼是混亂,所以我必須確保我的並行計算樹總是有較低的超時對於更深的葉子(但是無論如何這都是有意義的,只是難以維持) –

1

如果你只是尋找信息的度量上個人的未來是需要很長的時間(或與他人相結合),最好的辦法是創建期貨時使用的包裝記錄的指標:

object InstrumentedFuture { 
     def now() = System.currentTimeMillis() 
     def apply[T](name: String)(code: => T): Future[T] = { 
      val start = now() 
      val f = Future { 
      code 
      } 
      f.onComplete { 
       case _ => println(s"Future ${name} took ${now() - start} ms") 
      } 
      f 
     } 
    } 


    val future1 = InstrumentedFuture("Calculator") { /*...code...*/ } 
    val future2 = InstrumentedFuture("Differentiator") { /*...code...*/ } 
+0

這似乎是一個體面和簡單的想法:)可能不會完全解決我的問題,但可以幫助調試大多數簡單情況 –

+0

建議的解決方案有幾個錯誤。我們可以使用下面的代碼來實現這個功能:'{(s = System.nanoTime();(Try(code),System.nanoTime()s)})transform {case Success((r,t))=> {println( s「Future $ {name}花費$ t ns」); r}}' –

相關問題