2016-06-07 50 views
0

以下代碼示例(可以複製並運行)顯示MyParentActor,它創建一個MyChildActorAkka:Akka重新啓動後的消息排序

MyChildActor爲其第一條消息引發異常,導致其重新啓動。

但是,我想要實現的是在「消息2」重新啓動MyChildActor之前仍然處理「消息1」。

取而代之的是,消息1被添加到郵箱隊列的尾部,因此消息2被首先處理。

如何在重新啓動演員時獲得原始消息的排序,而無需創建自己的郵箱等?

object TestApp extends App { 
    var count = 0 
    val actorSystem = ActorSystem() 


    val parentActor = actorSystem.actorOf(Props(classOf[MyParentActor])) 
    parentActor ! "Message 1" 
    parentActor ! "Message 2" 

    class MyParentActor extends Actor with ActorLogging{ 
    var childActor: ActorRef = null 

    @throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
     childActor = context.actorOf(Props(classOf[MyChildActor])) 
    } 

    override def receive = { 
     case message: Any => { 
     childActor ! message 
     } 
    } 

    override def supervisorStrategy: SupervisorStrategy = { 
     OneForOneStrategy() { 
      case _: CustomException => Restart 
      case _: Exception   => Restart 
     } 
    } 
    } 

    class MyChildActor extends Actor with ActorLogging{ 


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
     message match { 
     case Some(e) => self ! e 
     } 
    } 

    override def receive = { 
     case message: String => { 
     if (count == 0) { 
      count += 1 
      throw new CustomException("Exception occurred") 
     } 
     log.info("Received message {}", message) 
     } 
    } 
    } 

    class CustomException(message: String) extends RuntimeException(message) 
} 

回答

1

您可以用特殊的信封標記失敗的消息,並將所有內容都存儲到該消息的接收中(請參閱子actor實現)。只需定義一個行爲,在該行爲中,除了特定的信封外,參與者將存儲每條消息,處理它的有效負載,然後將所有其他消息排除並返回到正常行爲。

這給了我:

INFO TestApp$MyChildActor - Received message Message 1 
INFO TestApp$MyChildActor - Received message Message 2 

object TestApp extends App { 
    var count = 0 
    val actorSystem = ActorSystem() 


    val parentActor = actorSystem.actorOf(Props(classOf[MyParentActor])) 
    parentActor ! "Message 1" 
    parentActor ! "Message 2" 

    class MyParentActor extends Actor with ActorLogging{ 
    var childActor: ActorRef = null 

    @throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
     childActor = context.actorOf(Props(classOf[MyChildActor])) 
    } 

    override def receive = { 
     case message: Any => { 
      childActor ! message 
     } 
    } 

    override def supervisorStrategy: SupervisorStrategy = { 
     OneForOneStrategy() { 
      case e: CustomException => Restart 
      case _: Exception => Restart 
     } 
    } 
    } 

    class MyChildActor extends Actor with Stash with ActorLogging{ 


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
     message match { 
      case Some(e) => 
       self ! Unstash(e) 
     } 
    } 

    override def postRestart(reason: Throwable): Unit = { 
     context.become(stashing) 
     preStart() 
    } 

    override def receive = { 
     case message: String => { 
      if (count == 0) { 
       count += 1 
       throw new CustomException("Exception occurred") 
      } 
      log.info("Received message {}", message) 
     } 
    } 

    private def stashing: Receive = { 
     case Unstash(payload) => 
      receive(payload) 
      unstashAll() 
      context.unbecome() 
     case m => 
      stash() 
    } 
    } 

    case class Unstash(payload: Any) 
    class CustomException(message: String) extends RuntimeException(message) 
} 
相關問題