我有一個演員(稱爲工人),其發送相同的消息給其他3個演員(稱爲過濾器1,過濾器2,過濾器3)阿卡和併發演員執行
每個這種過濾器具有隨機的時間來解決這個動作。然後,在工人的演員,我用的是問格局,等待未來的成功:
class Worker2 extends Actor with ActorLogging {
val filter1 = context.actorOf(Props[Filter1], "filter1")
val filter2 = context.actorOf(Props[Filter2], "filter2")
val filter3 = context.actorOf(Props[Filter3], "filter3")
implicit val timeout = Timeout(100.seconds)
def receive = {
case Work(t) =>
val futureF3 = (filter3 ? Work(false)).mapTo[Response]
val futureF2 = (filter2 ? Work(true)).mapTo[Response]
val futureF1 = (filter1 ? Work(true)).mapTo[Response]
val aggResult: Future[Boolean] =
for {
f3 <- futureF3
f2 <- futureF2
f1 <- futureF1
} yield f1.reponse && f2.reponse && f3.reponse
if (Await.result(aggResult, timeout.duration)) {
log.info("Response: true")
sender ! Response(true)
} else {
log.info("Response: false")
sender ! Response(false)
}
}
}
如果任何過濾演員返回false,那麼我不需要其他的答案。例如,如果我並行運行3個Filter Actors,如果在一種情況下Filter1響應爲false,則Work解決,我不需要Filter2和Filter3的答案。
在這段代碼中,我總是需要等待3個處決決定,這似乎是不必要的。有沒有辦法建立一個短路?
+1。這是一個堅實的解決方案。請記住,其他期貨仍然會完成,您不必等待它們。 – cmbaxter
超凡!它工作得很好。對於其他未來,我正在考慮取消任務+他們的優先收件箱。我的意思是,在使用false解析之後,我向所有actor發送一個帶有任務ID的Cancel消息。由於此消息具有最高優先級,因此如果未處理該消息,我將以Actor狀態存儲任務ID。然後,當任務到達時,我檢查這個ID。如果存在,則完全忽略。這樣,我可以避免執行昂貴的任務 – German