2012-07-07 67 views
8

我有一個未知數量的作業要由已知(當然)數量的演員執行。演員完成工作後,初始工作數量可能會增加。也就是說,一個演員在完成任務後可能會添加一個新的工作來執行。如何知道演員是否空閒

我處理這個問題的方式是讓每個角色在完成作業時發回一條消息給主人,不僅是執行結果,還有一個'標誌'表示演員現在處於空閒狀態。主人有一個工作隊列和一個空閒的演員隊列,每當一個演員發送一個'工作完成信息'時,主人將檢查是否還有其他東西要做那個演員......等等等等,直到作業的隊列是空的,空閒的隊列已滿......那時我關閉了系統。這裏沒有太多的監督,所以我覺得我沒有正確地做...

我不使用路由器,因爲我找不到一種方法來查詢閒置的演員路由器,所以我的問題是:

什麼是「正確的」方式來處理我上面在Akka中描述的情況?

回答

7

你應該看看Akka's routing capabilites.SmallestMailboxRouter可能是你在找什麼。

作爲替代方案,您可以根據需要創建演員,即對於每個任務,都會動態創建新演員。中央演員跟蹤所有當前活動的演員。工人角色完成後,它會自行發送一個PoisonPill並通知主人有關其關閉(主動或通過標準Terminate消息,Akka將發送給監督角色)。一旦沒有更多的主動角色,即沒有更多的任務,控制器角色就會關閉系統。看完後評論

增加: 看看的SmallestMailboxLike,Scala的特質通過SmallestMailboxRouter混合來源。警告:您應該具備Scala的基本知識。但是,這通常是一個好主意,無論如何,如果你想用阿卡...的方法isProcessingMessage(ActorRef)可以理解爲isNotIdle(ActorRef)

// Returns true if the actor is currently processing a message. 
// It will always return false for remote actors. 
// Method is exposed to subclasses to be able to implement custom 
// routers based on mailbox and actor internal state. 
protected def isProcessingMessage(a: ActorRef): Boolean = a match { 
    case x: LocalActorRef ? 
    val cell = x.underlying 
    cell.mailbox.isScheduled && cell.currentMessage != null 
    case _ ? false 
} 

// Returns true if the actor currently has any pending messages 
// in the mailbox, i.e. the mailbox is not empty. 
// It will always return false for remote actors. 
// Method is exposed to subclasses to be able to implement custom 
// routers based on mailbox and actor internal state. 
protected def hasMessages(a: ActorRef): Boolean = a match { 
    case x: LocalActorRef ? x.underlying.mailbox.hasMessages 
    case _    ? false 
} 
+0

我經歷了Akka的路由,這就是爲什麼我決定不使用路由器(也許我應該寫我自己的),因爲我無法查詢路由器關於工人的閒置狀態(也許我錯了,你能指出我在哪裏看看?) – DodoTheDeadDoo 2012-07-07 20:17:19

+0

不是故意按下輸入...無論如何,我會研究毒丸解決方案,就像我現在正在做的一樣,我不殺死一個工人,我只是將它設置爲空閒狀態,以便它可以被重新利用......當我沒有更多的任務時而且所有的工人都比我知道我已經完成了。 – DodoTheDeadDoo 2012-07-07 20:21:24

+0

所以我會寫我自己的路由器:) – DodoTheDeadDoo 2012-07-11 14:24:00

1

另一種策略可以使用BalancingDispatcher和RoundRobinRouter(作爲一個演員「池」 )。從阿卡文檔:

BalancingDispatcher 
# This is an executor based event driven dispatcher that will try to redistribute work from busy actors to idle actors. 




# All the actors share a single Mailbox that they get their messages from. 

It is assumed that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors; i.e. the actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. 

# Sharability: Actors of the same type only 

# Mailboxes: Any, creates one for all Actors 

# Use cases: Work-sharing 

定義您的調度員application.conf或編程方式加載它在啓動時。

private final static Config akkaConfig = ConfigFactory.parseString(

      "my-dispatcher.type = BalancingDispatcher \n" + 
      "my-dispatcher.executor = fork-join-executor \n" + 
      "my-dispatcher.fork-join-executor.parallelism-min = 8 \n" + 
      "my-dispatcher.fork-join-executor.parallelism-factor = 3.0 \n" + 
      "my-dispatcher.fork-join-executor.parallelism-max = 64 " 
); 

然後爲路由定義路由器和調度器。

getContext().actorOf(new Props(MyActor.class).withRouter(new RoundRobinRouter(10)).withDispatcher("my-dispatcher"), "myActor"); 

那麼路由器將簡單地去「分配」的消息,並分發器將運行選定的演員(和它實現工作竊取以及)

-1

平衡調度程序將只使用一個郵箱的所有創建的演員都是用BalancingDispatcher創建的。所以它會讓你的工作變得簡單。