2013-07-25 90 views
3

我有一個演員(稱爲工人),其發送相同的消息給其他3個演員(稱爲過濾器1,過濾器2,過濾器3)阿卡和併發演員執行

每個這種過濾器具有隨機的時間來解決這個動作。然後,在工人的演員,我用的是問格局,等待未來的成功:

class Worker2 extends Actor with ActorLogging { 

    val filter1 = context.actorOf(Props[Filter1], "filter1") 
    val filter2 = context.actorOf(Props[Filter2], "filter2") 
    val filter3 = context.actorOf(Props[Filter3], "filter3") 

    implicit val timeout = Timeout(100.seconds) 

    def receive = { 
    case Work(t) => 

     val futureF3 = (filter3 ? Work(false)).mapTo[Response] 
     val futureF2 = (filter2 ? Work(true)).mapTo[Response] 
     val futureF1 = (filter1 ? Work(true)).mapTo[Response] 

     val aggResult: Future[Boolean] = 
     for { 
      f3 <- futureF3 
      f2 <- futureF2 
      f1 <- futureF1 
     } yield f1.reponse && f2.reponse && f3.reponse 

     if (Await.result(aggResult, timeout.duration)) { 
     log.info("Response: true") 
     sender ! Response(true) 
     } else { 
     log.info("Response: false") 
     sender ! Response(false) 
     } 
    } 
} 

如果任何過濾演員返回false,那麼我不需要其他的答案。例如,如果我並行運行3個Filter Actors,如果在一種情況下Filter1響應爲false,則Work解決,我不需要Filter2和Filter3的答案。

在這段代碼中,我總是需要等待3個處決決定,這似乎是不必要的。有沒有辦法建立一個短路?

回答

7

的解決方案這個問題是使用Future.find() - Scaladoc Here

你可以解決這個問題是這樣的:

val failed = Future.find([f1,f2,f3]) { res => !res } 
Await.result(failed, timeout.duration) match { 
    None => // Success 
    _ => // Failed 
} 

Future.find()將返回完成並匹配謂詞的第一個將來。如果所有期貨都已完成並且沒有結果與謂詞匹配,則返回無。

編輯:

更好的解決方案將是,以防止阻塞所有在一起,並當響應被發現直接使用阿卡管功能管的結果給發件人。這樣,你不阻止使用該演員線程:

import akka.pattern.pipe 

val failed = Future.find([f1,f2,f3]) { res => !res } 
val senderRef = sender 
failed.map(res => Response(res.getOrElse(true))).pipeTo(senderRef) 

在getOrElse(真)部分的結果是假的,如果我們找到了一個未來就像之前否則我們返回true。

+0

+1。這是一個堅實的解決方案。請記住,其他期貨仍然會完成,您不必等待它們。 – cmbaxter

+0

超凡!它工作得很好。對於其他未來,我正在考慮取消任務+他們的優先收件箱。我的意思是,在使用false解析之後,我向所有actor發送一個帶有任務ID的Cancel消息。由於此消息具有最高優先級,因此如果未處理該消息,我將以Actor狀態存儲任務ID。然後,當任務到達時,我檢查這個ID。如果存在,則完全忽略。這樣,我可以避免執行昂貴的任務 – German

1

我想你想要的是過濾未來如果響應是真實的。由於表達方式的作用,它會短路,並且不會等待期貨的其餘部分完成以便組裝響應。它仍然會返回失敗的未來與MatchError異常(每[1]),你需要使用onFailure處處理

所以

val aggResult = for { 
    f3 <- futureF3 if (f3.response) 
    f2 <- futureF2 if (f2.response) 
    f1 <- futureF1 if (f1.response) 
} yield f1.reponse && f2.reponse && f3.reponse 

aggResult.onFailure { case MatchError => sender ! false } 

[1]來處理:https://groups.google.com/forum/#!msg/akka-user/oCBpAMRekks/X4y0QV-oOPYJ

+0

不完全是......這是更好地看到它在對產量的一部分。如果其中一個是錯誤的,那麼等待其他人是沒有意義的,因爲答案是錯誤的!無論如何,它比我的代碼好得多!:) – German

+0

Fair'nuf。布賴恩的解決方案要好得多。 – rjsvaljean