2012-12-03 70 views
5

我有一個Akka actor負責處理http調用。我用斯卡拉派遣了一個API發送多個HTTP請求:如何處理(akka)演員中的多個承諾?

urls.foreach { u 
    val service = url(u) 
    val promise = Http(service OK as.String).either 
    for(p <- promise) 
    { 
    p match 
    { 
     case Left(error) => 
     faultHandler(error) 
     case Right(result) => 
     resultHandler(result) 
    } 
    } 

resultHandler功能,我遞增變量nbOfResults一個實例,並比較呼叫我做的次數。

def resultHandler(result:String) 
{ 
    this.nbOfResults++ 
    ... 
    if(nbOfResults == nbOfCalls) 
    // Do something 
} 

安全嗎?如果兩個調用同時返回結果,可能同時訪問變量nbOfResults

現在,我相信演員或多或少地等同於一個線程,因此回調函數不會同時執行。這是對的嗎 ?

+0

的答案包含關於如何做到這一點的建議,我只是想備案明確指出,是的,你需要注意異步回調,他們將被同時執行。因此,總之,在上面的代碼中處理nbOfResults是不正確的。 –

回答

3

這裏是只使用派遣阿列克謝·羅曼諾夫響應的變體:下面

//Promises will be of type Array[Promise[Either[Throwable, String]]] 
val promises = urls.map { u => 
    val service = url(u) 

    Http(service OK as.String).either 
} 

//Http.promise.all transform an Iterable[Promise[A]] into Promise[Iterable[A]] 
//So listPromise is now of type Promise[Array[Either[Throwable, String]]] 
val listPromise = Http.promise.all(promises) 

for (results <- listPromise) { 
    //Here results is of type Array[Either[Throwable, String]] 

    results foreach { result => 
     result match { 
      Left(error) => //Handle error 
      Right(response) => //Handle response 
     } 
    } 
} 
2

有一個更好的方法:

val promises = urls.map {u => 
    val service = url(u) 
    val promise = Http(service OK as.String).either 
} 

val listPromise = Future.sequence(promises) 

listPromise.onComplete { whatever } 
2

我對他的回答與阿列克謝·羅曼諾夫同意。無論您選擇如何同步您的http請求,請注意您處理承諾完成的方式。你的直覺是正確的,因爲併發訪問可能出現在actor的狀態上。更好的方式來處理這將是做這樣的事情:

def resultHandler(result: String) { 
    //on completion we are sending the result to the actor who triggered the call 
    //as a message 
    self ! HttpComplete(result) 
} 

,並在演員的接收功能:

def receive = { 
    //PROCESS OTHER MESSAGES HERE 
    case HttpComplete(result) => //do something with the result 
} 

這樣,你確保處理HTTP結果不會違反演員的狀態下,從外部,而是從演員的接收回路這與CAS的AtomicReference做

1
val nbOfResults = new java.util.concurrent.atomic.AtomicInteger(nbOfCalls) 

// After particular call was ended  
if (nbOfResults.decrementAndGet <= 0) { 
    // Do something 
} 

[編輯]刪除舊的答案正確的方式 - 而(真),compareAndSet等

+0

incrementAndGet有什麼問題? –

+0

考慮到建議,增加了一個答案的變體 – idonnie