2014-04-29 67 views
1

我想做大量的外部服務調用,每個跟進了異常處理和有條件的進一步處理。我認爲使用.onComplete內部擴展這個很好的示例(Asynchronous IO in Scala with futures)很容易,但似乎我並不瞭解有關範圍和/或期貨的內容。任何人都可以指出我正確的方向嗎?異步處理使用列表的斯卡拉期貨與onComplete異常處理

#!/bin/bash 
scala -feature $0 [email protected] 
exit 
!# 

import scala.concurrent.{future, blocking, Future, Await} 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.duration._ 
import scala.util.{Success, Failure} 
import scala.language.postfixOps 

val keylist = List("key1", "key2") 

val myFuts: List[Future[String]] = keylist.map { 
    myid => future { 
    // this line simulates an external call which returns a future (retrieval from S3) 
    val myfut = future { Thread.sleep(1); "START " + myid} 

    var mystr = "This should have been overwritten" 
    myfut.onComplete { 
     case Failure(ex) => { 
     println (s"failed with error: $ex") 
     mystr = "FAILED" 
     } 
     case Success(myval) => { 
     mystr = s"SUCCESS $myid: $myval" 
     println (mystr) 
     } 
    } 
    mystr 
    } 
} 

val futset: Future[List[String]] = Future.sequence(myFuts) 
println (Await.result(futset, 10 seconds)) 
我的電腦(斯卡拉2.10.4)上

,這個打印:

SUCCESS key2: START key2 
SUCCESS key1: START key1 
List(This should have been overwritten, This should have been overwritten) 

我想(順序不重要):

SUCCESS key2: START key2 
SUCCESS key1: START key1 
List(SUCCESS key2: START key2, SUCCESS key1: START key1) 

回答

3

我會避免使用onComplete,並試圖用它做一個可變的變量副作用的邏輯。相反,我會映射未來並處理失敗案例作爲返回不同的值。這裏有一個稍微修改的代碼版本,在Future上使用map(通過理解),然後使用recover來處理故障情況。希望這是你正在尋找的:

val keylist = List("key1", "key2") 

val myFuts: List[Future[String]] = keylist.map {myid => 

    // this line simulates an external call which returns a future (retrieval from S3) 
    val myfut = future { Thread.sleep(1); "START " + myid} 
    val result = for (myval <- myfut) yield { 
    val res = s"SUCCESS $myid: $myval" 
    println(res) 
    res 
    } 
    result.recover{ 
    case ex => 
     println (s"failed with error: $ex") 
     "FAILED"   
    } 

} 

val futset: Future[List[String]] = Future.sequence(myFuts) 
println (Await.result(futset, 10 seconds)) 
+0

工程很好。 .recover方法是該模式的關鍵。我沒有意識到可以做到這一點。非常感謝! –

+0

對於那些感興趣的人,這個答案和更有效的替代方法在http://stackoverflow.com/a/15776974/29771中給出。 – Glenn

2

在完全不返回一個新的未來,它只是讓你在未來完成時做些事情。因此,在執行onComplete之前,您的第一個Future塊會返回,因此您將返回字符串的原始值。

我們所能做的就是用諾言回報未來,而未來是通過第一個未來的結果完成的。

val keylist = List("key1", "key2") 

    val myFuts: List[Future[String]] = keylist.map { 
    myid => { 
     // this line simulates an external call which returns a future (retrieval from S3) 
     val myfut = Future { 
     Thread.sleep(1); "START " + myid 
     } 
     var mystr = "This should have been overwritten" 
     val p = Promise[String]() 
     myfut.onComplete { 
     case Failure(ex) => 
      println(s"failed with error: $ex") 
      mystr = "FAILED" 
      p failure ex 
     case Success(myval) => 
      mystr = s"SUCCESS $myid: $myval" 
      println(mystr) 
      p success myval 
     } 
     p.future 
    } 
    } 

    val futset: Future[List[String]] = Future.sequence(myFuts) 
    println(Await.result(futset, 10 seconds)) 

會超靈便這將是一個mapAll方法,我問了一下這裏: Map a Future for both Success and Failure

+0

它超時出現錯誤,當我運行它,但有趣的解決方案。我不明白'p'Promise是如何完成的,並且有與任務相關的價值。感謝您的幫助 –

+0

哎呀是的,補充說。 – monkjack

+0

現在工作很好。很高興看到使用Promise的解決方案!我一直在想,除了競爭性完成以外,他們在哪裏有用。 –