雖然學習阿卡的監管策略,我想出了下面的例子:問父母演員問兒童演員 - 爲什麼這種方法不奏效?
我想父演員(其中有一個自定義的監管策略)以ask
其子角色的一些狀態和結果返回給sender
。對演員的電話也應該是ask
,而不是tell
(只是爲了配合Future
)。監督策略是通過將狀態存儲在兒童演員中並在殺死他們中的一個之後查詢兒童來進行測試。
我想出了下面的測試和實現。我想用pipeTo
模式將兒童的future
組合成一個單一的future
,這將返回到父母的sender
。 但是,這種方法不能按預期工作。我已經確定父母對孩子執行的ask
不會返回預期狀態。
我也試過:
- 通過設置在兒童演員
.withDispatcher(CallerThreadDispatcher.Id)
只使用單一的調度員 - 找回孩子的狀態同步使用
Await.result(future, timeout)
但沒有辦法的幫助。我如何使我的代碼按預期工作?是否有可以改進的任何其他地區(以兒童演員設置人工的國家例如像只知道,他們已經重新啓動?)
SupervisorStrategiesTest:
package org.skramer.learn.supervisorStrategies
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, AllForOneStrategy, DeadLetter, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.ask
import akka.testkit.{CallingThreadDispatcher, ImplicitSender, TestKit, TestProbe}
import akka.util.Timeout
import org.scalatest.{Matchers, WordSpecLike}
import org.skramer.learn.AkkaSystemClosing
import org.skramer.learn.supervisorStrategies.StateHoldingActor.{ActorThrowCommand, AddStateCommand, GetStateCommand}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
class SupervisorStrategiesTest extends TestKit(ActorSystem("testSystem")) with WordSpecLike with Matchers with ImplicitSender with AkkaSystemClosing {
import StateHoldingActor._
"actor with custom supervision strategy" should {
"apply the strategy to a single child" in {
implicit val timeout: Timeout = 3 seconds
val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor)))
val initialStateFuture = parentActor ? "state"
val initialState = Await.result(initialStateFuture, timeout.duration)
initialState shouldBe List(Vector(), Vector())
parentActor ! ("first", AddStateCommand(1))
parentActor ! ("second", AddStateCommand(2))
val currentStateFuture = parentActor ? "state"
val currentState = Await.result(currentStateFuture, timeout.duration)
currentState shouldBe List(Vector(1), Vector(2))
parentActor ! "throwFirst"
val stateAfterRestartFuture = parentActor ? "state"
val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration)
stateAfterRestart shouldBe List(Vector(), Vector(2))
}
"apply the strategy to all children" in {
implicit val timeout: Timeout = 3 seconds
val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor)))
val initialStateFuture = parentActor ? "state"
val initialState = Await.result(initialStateFuture, timeout.duration)
initialState shouldBe List(Vector(), Vector())
parentActor ! ("first", AddStateCommand(1))
parentActor ! ("second", AddStateCommand(2))
val currentStateFuture = parentActor ? "state"
val currentState = Await.result(currentStateFuture, timeout.duration)
currentState shouldBe List(Vector(1), Vector(2))
parentActor ! "throwFirst"
val stateAfterRestartFuture = parentActor ? "state"
val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration)
stateAfterRestart shouldBe List(Vector(), Vector())
}
}
}
StateHoldingActor:
object StateHoldingActor {
case class ActorThrowCommand()
case class AddStateCommand(stateElement: Int)
case class GetStateCommand()
case class GetStateCommandWithResponse()
def props(receiver: ActorRef): Props = Props(new StateHoldingActor())
}
class StateHoldingActor() extends Actor with ActorLogging {
log.info("about to create state")
private var state = Vector[Int]()
log.info(s"state created: $state")
import StateHoldingActor._
override def receive: Receive = {
case AddStateCommand(i) =>
log.info(s"extending state: $state")
state = i +: state
log.info(s"extended state: $state")
case GetStateCommand() =>
log.info(s"returning state: $state")
sender ! state
case GetStateCommandWithResponse() =>
log.info(s"returning state in response: $state")
sender ! state
case _: ActorThrowCommand =>
log.info(s"throwing exception with state: $state")
throw new IllegalStateException("Should crash actor instance and restart state")
}
}
ParentActor:
abstract class ParentActor(recipient: ActorRef) extends Actor with ActorLogging {
log.info("creating children")
private val stateHoldingActor1 = context
.actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id))
private val stateHoldingActor2 = context
.actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id))
log.info("children created")
implicit val timeout: Timeout = 3 seconds
import scala.concurrent.ExecutionContext.Implicits.global
override def receive: Receive = {
case "throwFirst" =>
log.info("stateHoldingActor1 ! ActorThrowCommand")
stateHoldingActor1 ! ActorThrowCommand
case "throwSecond" =>
log.info("stateHoldingActor1 ! ActorThrowCommand")
stateHoldingActor2 ! ActorThrowCommand
case "state" =>
log.info("gathering states")
val futureResults: Future[List[Any]] = Future
.sequence(List(stateHoldingActor1 ? GetStateCommand, stateHoldingActor2 ? GetStateCommand))
import akka.pattern.pipe
futureResults pipeTo sender()
case ("first", [email protected](_)) => stateHoldingActor1 forward msg
case ("second", [email protected](_)) => stateHoldingActor2 forward msg
}
}
OneForOneParentActor:
class OneForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) {
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
case _ => Restart
}
}
allForOneParentActor:
class AllForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) {
override def supervisorStrategy: SupervisorStrategy = AllForOneStrategy() {
case _ => Restart
}
}
感謝您指出缺少的圓括號,我也沒有意識到case對象的屬性。 然而,請您詳細說明您看到的比賽嗎?我設法讓我的測試通過只添加缺少的括號,不需要等待。 此外,我不能讓你的測試通過 - 我不斷讓List(Vector(),Vector())不等於List(Vector(),Vector(4))''。你可以仔細檢查一下嗎? ParentActor的'state'接收事件中的'ask'是否處理了比賽的問題(即,因爲第一個參與者可以重新啓動,所以第二個參與者不應該準備好呢?)? –
我會再次檢查,當我有機會(從現在兩天)。 – thwiegan
@ SebastianKramer爲了確保,我們運行的是相同的設置,您使用的是哪個版本(我正在運行akka 2.5.2)?如果我運行你的測試(沒有任何延遲),我得到'List(Vector(),Vector(4))不等於List(Vector(),Vector())'。哪個btw。也暗示了一場比賽,因爲它不是100%可重複的,首先會發生什麼。也可能依賴於機器時序等。 – thwiegan