2017-06-05 100 views
1

雖然學習阿卡的監管策略,我想出了下面的例子:問父母演員問兒童演員 - 爲什麼這種方法不奏效?

我想父演員(其中有一個自定義的監管策略)以ask其子角色的一些狀態和結果返回給sender。對演員的電話也應該是ask,而不是tell(只是爲了配合Future)。監督策略是通過將狀態存儲在兒童演員中並在殺死他們中的一個之後查詢兒童來進行測試。

我想出了下面的測試和實現。我想用pipeTo模式將兒童的future組合成一個單一的future,這將返回到父母的sender。 但是,這種方法不能按預期工作。我已經確定父母對孩子執行的ask不會返回預期狀態。

我也試過:

  • 通過設置在兒童演員.withDispatcher(CallerThreadDispatcher.Id)只使用單一的調度員
  • 找回孩子的狀態同步使用Await.result(future, timeout)

但沒有辦法的幫助。我如何使我的代碼按預期工作?是否有可以改進的任何其他地區(以兒童演員設置人工的國家例如像只知道,他們已經重新啓動?)

SupervisorStrategiesTest:

package org.skramer.learn.supervisorStrategies 

import akka.actor.SupervisorStrategy.Restart 
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, AllForOneStrategy, DeadLetter, OneForOneStrategy, Props, SupervisorStrategy} 
import akka.pattern.ask 
import akka.testkit.{CallingThreadDispatcher, ImplicitSender, TestKit, TestProbe} 
import akka.util.Timeout 
import org.scalatest.{Matchers, WordSpecLike} 
import org.skramer.learn.AkkaSystemClosing 
import org.skramer.learn.supervisorStrategies.StateHoldingActor.{ActorThrowCommand, AddStateCommand, GetStateCommand} 

import scala.concurrent.duration.DurationInt 
import scala.concurrent.{Await, Future} 

class SupervisorStrategiesTest extends TestKit(ActorSystem("testSystem")) with WordSpecLike with Matchers with ImplicitSender with AkkaSystemClosing { 

    import StateHoldingActor._ 

    "actor with custom supervision strategy" should { 
    "apply the strategy to a single child" in { 
     implicit val timeout: Timeout = 3 seconds 

     val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor))) 

     val initialStateFuture = parentActor ? "state" 
     val initialState = Await.result(initialStateFuture, timeout.duration) 
     initialState shouldBe List(Vector(), Vector()) 

     parentActor ! ("first", AddStateCommand(1)) 
     parentActor ! ("second", AddStateCommand(2)) 

     val currentStateFuture = parentActor ? "state" 
     val currentState = Await.result(currentStateFuture, timeout.duration) 
     currentState shouldBe List(Vector(1), Vector(2)) 

     parentActor ! "throwFirst" 

     val stateAfterRestartFuture = parentActor ? "state" 
     val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration) 
     stateAfterRestart shouldBe List(Vector(), Vector(2)) 
    } 

    "apply the strategy to all children" in { 
     implicit val timeout: Timeout = 3 seconds 

     val parentActor = system.actorOf(Props(new OneForOneParentActor(testActor))) 

     val initialStateFuture = parentActor ? "state" 
     val initialState = Await.result(initialStateFuture, timeout.duration) 
     initialState shouldBe List(Vector(), Vector()) 

     parentActor ! ("first", AddStateCommand(1)) 
     parentActor ! ("second", AddStateCommand(2)) 

     val currentStateFuture = parentActor ? "state" 
     val currentState = Await.result(currentStateFuture, timeout.duration) 
     currentState shouldBe List(Vector(1), Vector(2)) 

     parentActor ! "throwFirst" 

     val stateAfterRestartFuture = parentActor ? "state" 
     val stateAfterRestart = Await.result(stateAfterRestartFuture, timeout.duration) 
     stateAfterRestart shouldBe List(Vector(), Vector()) 
    } 
    } 


} 

StateHoldingActor:

object StateHoldingActor { 

    case class ActorThrowCommand() 

    case class AddStateCommand(stateElement: Int) 

    case class GetStateCommand() 

    case class GetStateCommandWithResponse() 

    def props(receiver: ActorRef): Props = Props(new StateHoldingActor()) 
} 

class StateHoldingActor() extends Actor with ActorLogging { 
    log.info("about to create state") 
    private var state = Vector[Int]() 
    log.info(s"state created: $state") 

    import StateHoldingActor._ 

    override def receive: Receive = { 
    case AddStateCommand(i) => 
     log.info(s"extending state: $state") 
     state = i +: state 
     log.info(s"extended state: $state") 
    case GetStateCommand() => 
     log.info(s"returning state: $state") 
     sender ! state 
    case GetStateCommandWithResponse() => 
     log.info(s"returning state in response: $state") 
     sender ! state 
    case _: ActorThrowCommand => 
     log.info(s"throwing exception with state: $state") 
     throw new IllegalStateException("Should crash actor instance and restart state") 

    } 

} 

ParentActor:

abstract class ParentActor(recipient: ActorRef) extends Actor with ActorLogging { 
    log.info("creating children") 
    private val stateHoldingActor1 = context 
            .actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id)) 
    private val stateHoldingActor2 = context 
            .actorOf(Props(new StateHoldingActor()).withDispatcher(CallingThreadDispatcher.Id)) 
    log.info("children created") 

    implicit val timeout: Timeout = 3 seconds 

    import scala.concurrent.ExecutionContext.Implicits.global 

    override def receive: Receive = { 
    case "throwFirst" => 
     log.info("stateHoldingActor1 ! ActorThrowCommand") 
     stateHoldingActor1 ! ActorThrowCommand 
    case "throwSecond" => 
     log.info("stateHoldingActor1 ! ActorThrowCommand") 
     stateHoldingActor2 ! ActorThrowCommand 
    case "state" => 
     log.info("gathering states") 
     val futureResults: Future[List[Any]] = Future 
              .sequence(List(stateHoldingActor1 ? GetStateCommand, stateHoldingActor2 ? GetStateCommand)) 
     import akka.pattern.pipe 
     futureResults pipeTo sender() 

    case ("first", [email protected](_)) => stateHoldingActor1 forward msg 
    case ("second", [email protected](_)) => stateHoldingActor2 forward msg 
    } 
} 

OneForOneParentActor:

class OneForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) { 
    override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { 
    case _ => Restart 
    } 
} 

allForOneParentActor:

class AllForOneParentActor(recipient: ActorRef) extends ParentActor(recipient) { 
    override def supervisorStrategy: SupervisorStrategy = AllForOneStrategy() { 
    case _ => Restart 
    } 
} 

回答

3

你可以聲明你的無參數的消息爲case類(帶括號),但你的ParentActor實現發送不帶括號的,因此只能發送類型,而不是一個實際的例子。這意味着StateHoldingActor中的接收方法(查找實例)將不匹配,並且ask不會返回。

例如stateHoldingActor1 ? GetStateCommand(), stateHoldingActor2 ? GetStateCommand()而不是stateHoldingActor1 ? GetStateCommand, stateHoldingActor2 ? GetStateCommand

解決此問題後,您的第一個測試應該貫穿始終。將消息對象用於消息可能是一個好主意,它不需要參數。然後這不會再發生。

雖然第二個測試仍然失敗。其中一個原因可能是您仍然在第二次測試中使用OneForOneParentActor,您可能想測試AllForOneParentActor。我正在處理另一個原因;)發佈這個答案,以便您也可以查看其他問題。

EDIT

第二測試失敗的僅僅是因爲爭用條件。當最後一次請求狀態(stateAfterRestartFuture)時,由於異常,第一個actor已經失敗,但第二個actor還沒有重新啓動(在「throwFirst」之後添加Thread.sleep來測試)。

EDIT2

我創建的代碼GitHub的庫我用來測試/修復:https://github.com/thwiegan/so_ActorSupervisionTest

EDIT3

在回答您的意見,這裏是發生了什麼,當我運行第二從我的GitHub代碼測試:

[INFO] [06/19/2017 10:32:07.734] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b] creating children 
[INFO] [06/19/2017 10:32:07.735] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b] children created 
[INFO] [06/19/2017 10:32:07.735] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b] gathering states 
[INFO] [06/19/2017 10:32:07.736] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$a] returning state: Vector() 
[INFO] [06/19/2017 10:32:07.736] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b/$b] returning state: Vector() 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] gathering states 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$a] extended state: Vector(3) 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$b] extended state: Vector(4) 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$a] returning state: Vector(3) 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$b] returning state: Vector(4) 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] stateHoldingActor1 ! ActorThrowCommand 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] gathering states 
[INFO] [06/19/2017 10:32:07.737] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/$b/$a] throwing exception with state: Vector(3) 
[INFO] [06/19/2017 10:32:07.738] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$b] returning state: Vector(4) 
[INFO] [06/19/2017 10:32:07.741] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b] Children crashed 
[ERROR] [06/19/2017 10:32:07.741] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b/$a] Should crash actor instance and restart state 
java.lang.IllegalStateException: Should crash actor instance and restart state 
[INFO] [06/19/2017 10:32:07.752] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$a] About to restart actor with state: Vector(3) 
[INFO] [06/19/2017 10:32:07.753] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b/$b] About to restart actor with state: Vector(4) 
[INFO] [06/19/2017 10:32:07.753] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$a] returning state: Vector() 
[INFO] [06/19/2017 10:32:07.753] [testSystem-akka.actor.default-dispatcher-6] [akka://testSystem/user/$b] gathering states 
[INFO] [06/19/2017 10:32:07.754] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/$b/$a] returning state: Vector() 
[INFO] [06/19/2017 10:32:07.754] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/$b/$b] returning state: Vector() 

正如你可以看到,作爲ParentActor試圖收集美國立即直接在throwFirst命令之後,第二個有狀態actor(Vector(4))在第一個有狀態actor(Vector(3))傳播它的ParentActor崩潰之前(它只需要時間)返回它的狀態。這就是爲什麼這是碰撞傳播到ParentActor之間的競爭條件 - 因此也是所有有狀態的參與者 - 和收集狀態命令之間的競爭條件。

由於我的測試沒有通過你的情況,我假設一些參數(機器定時或任何延遲)是不同的。

編輯

至於回答您的評論: 在重啓的時候,ParentActor已經完成服務處理狀態的查詢。既然你只問兩個StatefulActors,然後把期貨交給pipeTo模式,ParentActor不再需要觸及這個未來,所以它可以繼續處理任何進來的東西。在這種情況下,這是它的一個崩潰報告兒童。因此,當第一個StatefulActor崩潰後,狀態查詢排隊等待在重新啓動後處理時,第二個StatefulActor接收到狀態查詢,並在接收到重新啓動命令之前接受它。因此,它正在同時處理,即ParentActor處理崩潰,而在不同的將來執行的pipeTo模式繼續運行狀態查詢。在這種情況下減輕這種情況的一個選擇是停止兒童演員而不是重新啓動他們。這將使pipeTo未來超時,因爲第一個參與者不會響應,因此沒有可能不一致的狀態會被泄漏。

+0

感謝您指出缺少的圓括號,我也沒有意識到case對象的屬性。 然而,請您詳細說明您看到的比賽嗎?我設法讓我的測試通過只添加缺少的括號,不需要等待。 此外,我不能讓你的測試通過 - 我不斷讓List(Vector(),Vector())不等於List(Vector(),Vector(4))''。你可以仔細檢查一下嗎? ParentActor的'state'接收事件中的'ask'是否處理了比賽的問題(即,因爲第一個參與者可以重新啓動,所以第二個參與者不應該準備好呢?)? –

+0

我會再次檢查,當我有機會(從現在兩天)。 – thwiegan

+0

@ SebastianKramer爲了確保,我們運行的是相同的設置,您使用的是哪個版本(我正在運行akka 2.5.2)?如果我運行你的測試(沒有任何延遲),我得到'List(Vector(),Vector(4))不等於List(Vector(),Vector())'。哪個btw。也暗示了一場比賽,因爲它不是100%可重複的,首先會發生什麼。也可能依賴於機器時序等。 – thwiegan