我有一個ActorA從輸入流中讀取消息並將消息發送給一組ActorB's。當ActorA到達輸入流的末尾時,它清除其資源,向ActorB廣播完成消息,並關閉它。我如何確定所有演員都收到廣播消息
我有大約12個ActorB發送消息給一組ActorC's。當ActorB從ActorA收到完成消息後,它清除其資源並關閉自己,除了最後倖存的ActorB在ActorC關閉之前向ActorC廣播完成消息之外。
我有大約24個ActorC發送消息給單個ActorD。與ActorB類似,當每個ActorC獲得完成消息時,它清除其資源並關閉自身,除了最後倖存的向ActorD發送完成消息的ActorC之外。
當ActorD獲取完成消息時,它清除其資源並關閉它。
最初我有ActorB和ActorC在收到它時立即傳播完成消息,但這可能會導致ActorC在所有ActorB完成處理隊列之前關閉;同樣,在ActorC完成隊列處理之前,ActorD可能會關閉。
我的解決方案是使用在所述ActorB的
class ActorB(private val actorCRouter: ActorRef,
private val actorCount: AtomicInteger) extends Actor {
private val init = {
actorCount.incrementAndGet()
()
}
def receive = {
case Done => {
if(actorCount.decrementAndGet() == 0) {
actorCRouter ! Broadcast(Done)
}
// clean up resources
context.stop(self)
}
}
}
ActorC之間共享使用類似的代碼的AtomicInteger,每個ActorC共享一個的AtomicInteger。
目前所有的角色都是在一個web服務方法中初始化的,下游ActorRef被傳入上游角色的構造函數中。
是否有一個首選的方法來做到這一點,例如:使用調用Akka方法而不是AtomicInteger?
編輯:我考慮以下作爲一個可能的替代:當一個演員接收Done消息它設置接收超時時間爲5秒(該程序將需要一個多小時的運行,所以延遲清理/幾秒鐘關機不會影響性能);當演員獲得ReceiveTimeout時,它向下遊演員廣播完成,清理並關閉。 (對於ActorB和ActorC路由器使用的是SmallestMailboxRouter)
class ActorB(private val actorCRouter: ActorRef) extends Actor {
def receive = {
case Done => {
context.setReceiveTimeout(Duration.create(5, SECONDS))
}
case ReceiveTimeout => {
actorCRouter ! Broadcast(Done)
// clean up resources
context.stop(self)
}
}
}
有這樣的演員之間的任何形式的共享狀態是一個非常糟糕的主意。單獨的生命週期關注並確保您的參與者堅持單一責任原則。這樣做會更容易。 – Ryan 2014-09-29 00:35:24