2015-08-28 34 views
2

我有這種情況:演員是否可以在一定條件下閱讀信息?

  • ActorA發送每30-40秒
  • ActorA發送ActorB字符串打印(總是)
  • ActorB必須打印,他收到的琴絃ActorB開始/停止的消息,但只如果ActorA派只是一個開始消息

現在我不知道如果我可以做以下的事情:

  • ActorB只能在特定條件下(如果布爾值設置爲true)讀取消息而不丟失在布爾值設置爲false時收到的消息?
  • ActorB可以在其他字符串消息之前從ActorA讀取開始/停止消息嗎?我想有這種情況:ActorA發送一條開始消息給ActorB,ActorB開始打印他在開始消息之前收到的字符串(並且仍在接收),然後一旦收到停止消息就立即停止?

我不知道我是否解釋得好。

編輯:謝謝你,答案很好,但我仍然有一些疑慮。

  • 是否成爲消息的順序?我的意思是,如果我有「Start-M1-M2-Stop-M3-M4-M5-Start-M6-M7-Stop」,打印順序是「M1-M2」,然後是「M3-M4-M5-M6 -M7「,或者可以在M3,M4和M5之前讀取M6(如果M6在變成後立即被接收)?

  • 我可以給予更高的優先級來啓動/停止消息嗎?如果ActorB收到「M1-M2-M3」,然後在打印「M1」時收到停止消息,我希望ActorB再次保存M2和M3。

回答

2

您可以準確地解決您的問題,其中Stash特徵和become/unbecome功能的Akka。這個想法如下:

當您收到一個Stop消息,然後你切換到一種行爲,你藏匿所有不是Start的郵件。當您收到Start消息,然後切換到您打印所有收到的消息,另外你unstash其在此期間已經到達所有消息的行爲。

case object Start 
case object Stop 
case object TriggerStateChange 
case object SendMessage 

class ActorB extends Actor with Stash { 
    override def receive: Receive = { 
    case Start => 
     context.become(printingBehavior, false) 
     unstashAll() 
    case x => stash() 
    } 

    def printingBehavior: Receive = { 
    case msg: String => println(msg) 
    case Stop => context.unbecome() 
    } 
} 

class ActorA(val actorB: ActorRef) extends Actor { 

    var counter = 0 
    var started = false 

    override def preStart: Unit = { 
    import context.dispatcher 

    this.context.system.scheduler.schedule(0 seconds, 5 seconds, self, TriggerStateChange) 
    this.context.system.scheduler.schedule(0 seconds, 1 seconds, self, SendMessage) 
    } 

    override def receive: Actor.Receive = { 
    case SendMessage => 
     actorB ! "Message: " + counter 
     counter += 1 
    case TriggerStateChange => 
     actorB ! (if (started) { 
     started = false 
     Stop 
     } else { 
     started = true 
     Start 
     }) 
    } 
} 

object Akka { 
    def main(args: Array[String]) = { 
    val system = ActorSystem.create("TestActorSystem") 

    val actorB = system.actorOf(Props(classOf[ActorB]), "ActorB") 
    val actorA = system.actorOf(Props(classOf[ActorA], actorB), "ActorA") 

    system.awaitTermination() 
    } 
} 
+0

'unstashAll()'是否保留了消息的順序?我的意思是,如果我有 「開始-M1-M2-停止-M3-M4-M5-開始-M6-M7停止」,將打印順序是 「M1-M2-M3-M4-M5-M6-M7」或者可以在M3,M4和M5之前讀取M6(如果在'unstashAll()'之後接收到M6)? –

+0

@AsAs,內部的'unstashAll()'方法調用郵箱上的enqueueFirst。因此,我假設你將首先按照它們被隱藏的順序接收你的未壓縮的消息,然後接收在'unstashAll()'調用期間或之後收到的任何消息。 –

+0

謝謝。另一件事。我可以給予更高的優先級來啓動/停止消息嗎?如果ActorB收到「Start-M1-M2-M3」,然後在打印「M1」時收到停止消息,我希望ActorB再次保存M2和M3。可能嗎? –

2

檢查Become/Unbecome的功能。它可以讓你改變演員的行爲。

如果我理解正確,您希望您的ActorB具有兩種不同的狀態。在第一個狀態中,它應該緩存它收到的消息。在第二個狀態中,它應該打印所有緩存的消息並開始打印所有新消息。

像這樣:

case class Start 
case class Stop 
case class Message(val: String) 

class ActorB extends Actor { 

    var cache = Queue() 

    def initial: Receive = { 
    case Message(msg) => cache += msg 
    case Start => 
     for (m <- cache) self ! Message(m) 
     context.become(printing) 
    } 

    def printing: Receive = { 
    case Message(msg) => println(msg) 
    case Stop => context.become(initial) //or stop the actor 
    } 

    def receive = initial 
} 
+0

1)不應該'變成'線後的'換行'? 2)是否'become'十個分量消息的訂單?我的意思是,如果我有 「開始-M1-M2-停止-M3-M4-M5-開始-M6-M7停止」,將打印順序是 「M1-M2-M3-M4-M5-M6-M7」或者可以在M3,M4和M5之前讀取M6(如果它是在'成爲'之後收到的)? –

0

具有兩個狀態(兩個不同的行爲)之間演員乙交替。在最初的「掛起」狀態中,它等待「開始」消息,而其他消息則爲stashing

在收到'開始'消息後,取消所有存儲的消息並且become'激活',等待'停止'消息並寫出接收到的其他消息(其將包括未消除的消息)。在收到「停止」信息時,請回復「待處理」行爲。

0

我的一些想法

  1. 是如果布爾標誌從像數據庫或配置文件中的一些系統資源了,但我不認爲它應該是依賴於任何外部消息,因爲演員接收來自多個其他演員的消息。如果ActorB僅用於ActorA,兩者可以合併到一個

  2. 爲1類似,如何處理來自多個參與者的消息?如果只有一個演員A,則兩個演員可以合併。如果有多個,可以在數據庫中設置標誌,actorA將數據庫中的標誌更改爲「開始」或「停止」。演員B將根據該標誌打印或不打印。

一個演員應該對其他演員的狀態非常獨立。啓動和停止實際上是ActorA而不是ActorB的某種狀態

0

您已經有很多很好的答案,但不知何故,我覺得有必要提供一些更簡單,因爲你需要的是不一定的狀態機:

class ActorB extends Actor { 
    def receive: Receive = caching(Nil) 

    def caching(cached: List[String]): Receive = { 
    case msg: String => 
     context.become(caching(msg :: cached)) 
    case Start => 
     cached.reverse.foreach(println) 
     context.become(caching(Nil)) 
    } 
}