2014-09-28 52 views
2

我有一個ActorA從輸入流中讀取消息並將消息發送給一組ActorB's。當ActorA到達輸入流的末尾時,它清除其資源,向ActorB廣播完成消息,並關閉它。我如何確定所有演員都收到廣播消息

我有大約12個ActorB發送消息給一組ActorC's。當ActorB從ActorA收到完成消息後,它清除其資源並關閉自己,除了最後倖存的ActorB在ActorC關閉之前向ActorC廣播完成消息之外。

我有大約24個ActorC發送消息給單個ActorD。與ActorB類似,當每個ActorC獲得完成消息時,它清除其資源並關閉自身,除了最後倖存的向ActorD發送完成消息的ActorC之外。

當ActorD獲取完成消息時,它清除其資源並關閉它。

最初我有ActorB和ActorC在收到它時立即傳播完成消息,但這可能會導致ActorC在所有ActorB完成處理隊列之前關閉;同樣,在ActorC完成隊列處理之前,ActorD可能會關閉。

我的解決方案是使用在所述ActorB的

class ActorB(private val actorCRouter: ActorRef, 
      private val actorCount: AtomicInteger) extends Actor { 
    private val init = { 
    actorCount.incrementAndGet() 
    () 
    } 

    def receive = { 
    case Done => { 
     if(actorCount.decrementAndGet() == 0) { 
     actorCRouter ! Broadcast(Done) 
     } 
     // clean up resources 
     context.stop(self) 
    } 
    } 
} 

ActorC之間共享使用類似的代碼的AtomicInteger,每個ActorC共享一個的AtomicInteger。

目前所有的角色都是在一個web服務方法中初始化的,下游ActorRef被傳入上游角色的構造函數中。

是否有一個首選的方法來做到這一點,例如:使用調用Akka方法而不是AtomicInteger?


編輯:我考慮以下作爲一個可能的替代:當一個演員接收Done消息它設置接收超時時間爲5秒(該程序將需要一個多小時的運行,所以延遲清理/幾秒鐘關機不會影響性能);當演員獲得ReceiveTimeout時,它向下遊演員廣播完成,清理並關閉。 (對於ActorB和ActorC路由器使用的是SmallestMailboxRouter)

class ActorB(private val actorCRouter: ActorRef) extends Actor { 

    def receive = { 
    case Done => { 
     context.setReceiveTimeout(Duration.create(5, SECONDS)) 
    } 

    case ReceiveTimeout => { 
     actorCRouter ! Broadcast(Done) 
     // clean up resources 
     context.stop(self) 
    } 
    } 
} 
+1

有這樣的演員之間的任何形式的共享狀態是一個非常糟糕的主意。單獨的生命週期關注並確保您的參與者堅持單一責任原則。這樣做會更容易。 – Ryan 2014-09-29 00:35:24

回答

1

共享actorCount相關行動者之間是不是做一件好事。 Actor只能使用自己的狀態來處理消息。 如何爲ActorB類型的演員擁有ActorBCompletionHanlder actor?所有ActorB都將引用ActorBCompletionHanlder actor。每當ActorB收到完成消息,它可以做必要的清理,並簡單地將完成的消息傳遞給ActorBCompletionHanlder。 ActorBCompletionHanlder將維護狀態變量以保持計數。每次它收到完成的消息,它可以簡單地更新計數器。因爲這只是這個actor的狀態變量,所以不需要它是原子的,這樣就不需要任何明確的鎖定。 ActorBCompletionHanlder一旦收到上次完成的消息,就會向ActorC發送完成的消息。 這種方式共享activeCount不是演員之間,但只能由ActorBCompletionHanlder管理。同樣的事情可以重複其他類型。

A-> B的 - > BCompletionHanlder - > C'S - > CCompletionHandler - > d

其他辦法可以是對演員的埃維相關的一組一個監測演員。在監視器上使用watch api和兒童終止事件,您可以選擇決定在收到上次完成的消息後應該執行的操作。

val child = context.actorOf(Props[ChildActor]) 
    context.watch(child) 

    case Terminated(child) => { 
     log.info(child + " Child actor terminated") 
    }