2014-06-25 39 views
1

我正在嘗試爲一組Akka actor實現負載均衡。根據documentationBalancingPool提供了我感興趣的行爲(盜取工作)。出於某種原因,沒有「組變體」可以讓我自己創建路由並將它們傳遞給路由器。該文檔明確指出Akka BalancingPool:沒有組變體?

BalancingPool沒有組變體。

不用說爲什麼。我的actor的構造函數是在運行時調用的參數來調用的,所以我沒有別的選擇,只能以編程方式創建它們。

val resources:List[Any] = // ... 
val system = ActorSystem("MySystem") 
val routees = resources.map(r => system.actorOf(MyActor.props(r)) 
// This doesn't work for me, because every actor needs a resource! 
val router = system.actorOf(BalancingPool(3).props(Props[MyActor]), "router") 

如何創建一個BalancingPool路由器,將所有消息轉發給我的演員?爲什麼沒有BalancingGroup

+0

我假設沒有BalancingGroup,因爲路由必須共享一個郵箱。但是,當您自己創建路線時,每個人都有自己的郵箱。 –

回答

1

我不認爲有辦法讓BalancingPool與你的演員一起工作,但你可以改變你的演員,讓他們收到一個初始消息,爲他們提供他們需要設置的信息。如果添加這個類:

case class Setup(getResource:() => Any) 

而這些行到你的演員:

def receive = { 
    case Setup(getResource) => { 
    context.become(afterSetupReceive(getResource())) 
    } 
} 

def afterSetupReceive(resource: Any) = { 
    /* put cases from original recieve here */ 
} 

您可以創建自定義BalancedPool這樣的:

val router = system.actorOf(BalancingPool(resources.length).props(Props[MyActor]), "router") 
val resourcesIter = resources.iterator 
val getResource =() => resourcesIter.synchronized { 
    resourcesIter.next 
} 
router ! Broadcast(Setup(getResource)) 

周圍的迭代器同步的包裝是可怕的,但這會讓你解決BalancedPool中的限制。

+0

非常感謝!我從來沒有想過自己:)我雖然得到一個奇怪的錯誤:https://gist.github.com/anonymous/f8f09897e667db4d176b。看起來像BalancingPool正在創建具有無效名稱的演員? –

+2

@wingedsubmariner我不確定此功能是否可靠(由於共享郵箱)(另請參閱此[討論](https://groups.google.com/forum/#!msg/akka-user/D-qVGymeIKw/irpb1uIqtdEJ ))和支持發送廣播消息到BalancedPool應該被刪除:https://github.com/akka/akka/issues/15030 –

+0

@VolkerStampa現在我想到了,這是有道理的。你有什麼建議來解決這個問題嗎? –

0

既然廣播到BalancePool不是一個好主意,我想出了一個解決方法。我不是發送廣播,而是在演員的伴侶對象中創建一個包含資源的同步隊列。在創建之後,每個角色都會從該隊列中取出一個元素。不是一個漂亮的解決方案,但它的作品。

// Router creation 
val router = system.actorOf(BalancingPool(resources.length).props(Props[MyActor]), "router") 

// In MyActor.scala 
class MyActor { 
    val resource = MyActor.resourceQueue.dequeue() 

    // ... 
} 

object MyActor { 
    val resourceQueue = { 
    val queue = new mutable.SynchronizedQueue[MyResource]() 
    queue.enqueue(...) // <-- resources 
    queue 
    } 
}