2014-10-31 118 views
1

我有一個可以接收「getItems」消息的演員「ItemProvider」。 ItemProvider管理項目的項目。因此,我可以有幾個「getItems」消息請求項目A的項目和其他請求項目B的項目的「getItems」消息。什麼是推遲信息的最佳方式?

第一次「itemProvider」得到這樣的消息時,它需要調用服務來實際獲取項目 (這可能需要一分鐘的時間,服務會返回未來,因此它不會阻止演員)。在此等待期間,可以到達其他「getItems」消息。

項目「ItemProvider」緩存它從服務接收到的「Items」。 所以在1分鐘的加載時間後,它可以立即提供物品。

我很確定「ItemProvider」應該使用Akka的功能。但是,它應該如何處理它不能馬上服務的客戶呢?

我能想到的下列選項:

  1. ItemProvider持有列表pendingMessages。而它不能提供的信息被添加到這個列表中。當ItemProvider「準備就緒」時,它將處理待處理客戶

  2. ItemProvider將消息發送回其父項。父母將重新發出消息

  3. ItemProvider使用調度程序。並再次獲取消息。

  4. 也許不能使用成爲,但使用AbstractFSM類?

    有沒有人知道最好的阿卡方式來實現ItemProvider?

+0

我看起來好像可以爲他們服務,因爲您使用的是'Future',但是如果您在很短的時間內獲得相同的請求,那麼這可能是多餘的工作。你需要一種方式來告訴後續的請求你已經有了一個響應。所以,是的,你可能需要一個從請求到未來的Map,當有新的請求時,請檢查Map,如果它有一個Future,那麼你可以發送Future,也迴應這位客戶。 – 2014-10-31 09:19:57

+0

我不認爲你可以使用'成爲'。當你有一臺狀態機時,你需要'成爲',並且你有多個狀態機。每個單獨的請求都有一個狀態機,有兩個狀態:獲取和獲取。 – 2014-10-31 09:20:50

+0

我完全是akka的初學者。但我不明白爲什麼我不能將客戶列入待處理名單。項目準備就緒後,Itemprovider可以發送所有等待的客戶端結果。這在我看來是一個不錯的選擇 – jack 2014-10-31 09:41:08

回答

0

下面你以後會找到一個可能的方法來組織演員以滿足您的要求。在這個解決方案中,我將使用每個項目的actor實例來緩存特定於該項目的項目。然後,我將使用一個路由參與者,該參與者將接收請求以獲取項目的項目並將其委派給正確處理該項目緩存的子actor。在實際的緩存參與者中,您會看到我使用了stash/unstash來處理請求,直到緩存項目被加載(我在代碼中進行模擬)。代碼如下:

import akka.actor._ 
import scala.concurrent.Future 
import akka.pattern._ 
import concurrent.duration._ 
import akka.util.Timeout 

class ItemProviderRouter extends Actor{ 
    import ItemProvider._ 

    def receive = { 
    case get @ GetItems(project) => 

     //Lookup the child for the supplied project. If one does not 
     //exist, create it 
     val child = context.child(project).getOrElse(newChild(project)) 
     child.forward(get) 
    } 

    def newChild(project:String) = { 
    println(s"creating a new child ItemProvider for project $project") 
    context.actorOf(Props[ItemProvider], project) 
    } 

} 

object ItemProvider{ 
    case class GetItems(project:String) 
    case class Item(foo:String) 
    case class LoadedItems(items:List[Item]) 
    case object ClearCachedItems 
    case class ItemResults(items:List[Item]) 
} 

class ItemProvider extends Actor with Stash{ 
    import ItemProvider._ 

    //Scheduled job to drop the cached items and force a reload on subsequent request 
    import context.dispatcher 
    context.system.scheduler.schedule(5 minutes, 5 minutes, self, ClearCachedItems) 

    def receive = noCachedItems 

    def noCachedItems:Receive = { 
    case GetItems(project) => 
     stash()  
     fetchItems(project) 
     context.become(loadingItems) 


    case ClearCachedItems => 
     //Noop 
    } 

    def loadingItems:Receive = { 
    case get:GetItems => stash 

    case LoadedItems(items) => 
     println(s"Actor ${self.path.name} got items to cache, changing state to cachedItems") 
     context.become(cachedItems(items)) 
     unstashAll()  

    case ClearCachedItems => //Noop  
    } 

    def cachedItems(items:List[Item]):Receive = { 
    case GetItems(project) => 
     sender ! ItemResults(items) 

    case ClearCachedItems => 
     println("Clearing out cached items") 
     context.become(noCachedItems)  

    case other => 
     println(s"Received unexpected request $other when in state cachedItems")   
    } 

    def fetchItems(project:String){ 
    println(s"Actor ${self.path.name} is fetching items to cache") 

    //Simulating doing something that results in a Future 
    //representing the items to cache  

    val fut = Future{ 
     Thread.sleep(5000) 
     List(Item(s"hello $project"), Item(s"world $project")) 
    } 

    fut.map(LoadedItems(_)).pipeTo(self) 
    } 
} 

然後對其進行測試:

object ItemProviderTest extends App{ 
    import ItemProvider._ 
    val system = ActorSystem("test") 
    import system.dispatcher 
    val provider = system.actorOf(Props[ItemProviderRouter]) 

    implicit val timeout = Timeout(10 seconds) 
    for(i <- 1 until 20){ 
    val afut = provider ? GetItems("a") 
    val bfut = provider ? GetItems("b") 

    afut onSuccess{ 
     case ItemResults(items) => println(s"got items list of $items for project a") 
    } 

    bfut onSuccess{ 
     case ItemResults(items) => println(s"got items list of $items for project b") 
    }  
    } 
} 

爲了簡單起見,我使用的是實際的演員做路由,而不是一個自定義的路由器,但你也可以在這裏實現一個自定義路由器,如果性能(即郵箱命中)對你很重要。

+0

將fetchItems放入fetchItemsLoader參數中有意義嗎?我會獲得什麼? – jack 2014-11-01 18:03:55

+0

@jack,當然,如果你想從緩存和服務部分分開加載部分,那麼我不明白爲什麼不。如果是這種情況,那麼你可以在那裏使用'ask(?)'並且像我在這個例子中那樣獲得未來和管道回到自己,或者使用'tell(!)'並且直接等待響應需要一個未來。 – cmbaxter 2014-11-02 16:00:31

0

客戶端應按計劃重新發送冪等請求,直到它收到滿意的答覆或超時爲止。

您是否需要更多的ItemProviders,或者ItemProvider批次請求取​​決於所查詢資源的性質。如果每分鐘只能發出1個請求,則應該在ItemProvider中批量處理請求。然而,客戶有責任確保它繼續請求答覆,直至滿意爲止。它不應該依賴ItemProvider來可靠地記住請求。

1

看看Akka的Stash featureusage example)。 下面是(未經測試)代碼藏匿getItems信息,同時從服務器請求實際的項目,然後處理所有getItems郵件的服務器請求完成

import akka.actor.{Actor, Stash} 

class ItemProviderActor extends Actor with Stash { 
    private[this] itemsOpt : Option[Items] = None 

    def receive = processing 

    def processing: Receive = { 
    case m:GetItems => { 
     if(itemsOpt.nonEmpty) { 
     // respond immediately 
     itemsOpt.foreach(sender() ! _) 
     } 
     else { 
     // Stash current request and initiate cache update 
     context.become(retrivingData) 
     stash() 

     // Will send future results of item retrieval as a message to self 
     retrieveItems().pipeTo(self) 
     } 
    } 
    } 

    def retrivingData: Receive = { 
    case m: Items => 

     // items are retrieved, update cache 
     itemsOpt = Option(m) 

     // resume normal processing 
     context.become(processing) 

     // put all pending item requests back to actor's message queue 
     unstashAll() 


    case m:GetItems => 
     // busy retrieving items, store request to serve later 
     stash() 
    } 

    def retrieveItems() : Future[Items] = { 
    ??? 
    } 

} 
+0

這看起來非常像我需要的東西。我明天嘗試一下!非常感謝 – jack 2014-10-31 17:20:32

相關問題