下面你以後會找到一個可能的方法來組織演員以滿足您的要求。在這個解決方案中,我將使用每個項目的actor實例來緩存特定於該項目的項目。然後,我將使用一個路由參與者,該參與者將接收請求以獲取項目的項目並將其委派給正確處理該項目緩存的子actor。在實際的緩存參與者中,您會看到我使用了stash/unstash來處理請求,直到緩存項目被加載(我在代碼中進行模擬)。代碼如下:
import akka.actor._
import scala.concurrent.Future
import akka.pattern._
import concurrent.duration._
import akka.util.Timeout
class ItemProviderRouter extends Actor{
import ItemProvider._
def receive = {
case get @ GetItems(project) =>
//Lookup the child for the supplied project. If one does not
//exist, create it
val child = context.child(project).getOrElse(newChild(project))
child.forward(get)
}
def newChild(project:String) = {
println(s"creating a new child ItemProvider for project $project")
context.actorOf(Props[ItemProvider], project)
}
}
object ItemProvider{
case class GetItems(project:String)
case class Item(foo:String)
case class LoadedItems(items:List[Item])
case object ClearCachedItems
case class ItemResults(items:List[Item])
}
class ItemProvider extends Actor with Stash{
import ItemProvider._
//Scheduled job to drop the cached items and force a reload on subsequent request
import context.dispatcher
context.system.scheduler.schedule(5 minutes, 5 minutes, self, ClearCachedItems)
def receive = noCachedItems
def noCachedItems:Receive = {
case GetItems(project) =>
stash()
fetchItems(project)
context.become(loadingItems)
case ClearCachedItems =>
//Noop
}
def loadingItems:Receive = {
case get:GetItems => stash
case LoadedItems(items) =>
println(s"Actor ${self.path.name} got items to cache, changing state to cachedItems")
context.become(cachedItems(items))
unstashAll()
case ClearCachedItems => //Noop
}
def cachedItems(items:List[Item]):Receive = {
case GetItems(project) =>
sender ! ItemResults(items)
case ClearCachedItems =>
println("Clearing out cached items")
context.become(noCachedItems)
case other =>
println(s"Received unexpected request $other when in state cachedItems")
}
def fetchItems(project:String){
println(s"Actor ${self.path.name} is fetching items to cache")
//Simulating doing something that results in a Future
//representing the items to cache
val fut = Future{
Thread.sleep(5000)
List(Item(s"hello $project"), Item(s"world $project"))
}
fut.map(LoadedItems(_)).pipeTo(self)
}
}
然後對其進行測試:
object ItemProviderTest extends App{
import ItemProvider._
val system = ActorSystem("test")
import system.dispatcher
val provider = system.actorOf(Props[ItemProviderRouter])
implicit val timeout = Timeout(10 seconds)
for(i <- 1 until 20){
val afut = provider ? GetItems("a")
val bfut = provider ? GetItems("b")
afut onSuccess{
case ItemResults(items) => println(s"got items list of $items for project a")
}
bfut onSuccess{
case ItemResults(items) => println(s"got items list of $items for project b")
}
}
}
爲了簡單起見,我使用的是實際的演員做路由,而不是一個自定義的路由器,但你也可以在這裏實現一個自定義路由器,如果性能(即郵箱命中)對你很重要。
我看起來好像可以爲他們服務,因爲您使用的是'Future',但是如果您在很短的時間內獲得相同的請求,那麼這可能是多餘的工作。你需要一種方式來告訴後續的請求你已經有了一個響應。所以,是的,你可能需要一個從請求到未來的Map,當有新的請求時,請檢查Map,如果它有一個Future,那麼你可以發送Future,也迴應這位客戶。 – 2014-10-31 09:19:57
我不認爲你可以使用'成爲'。當你有一臺狀態機時,你需要'成爲',並且你有多個狀態機。每個單獨的請求都有一個狀態機,有兩個狀態:獲取和獲取。 – 2014-10-31 09:20:50
我完全是akka的初學者。但我不明白爲什麼我不能將客戶列入待處理名單。項目準備就緒後,Itemprovider可以發送所有等待的客戶端結果。這在我看來是一個不錯的選擇 – jack 2014-10-31 09:41:08