2011-11-07 64 views
7

問題陳述:我有一個需要以並行方式處理的證券組合。在Java中,我使用了一個線程池來處理每個安全性,並使用鎖定器來倒計時。一旦完成,我做一些合併等。使用Akka分叉和加入

所以我發送我的SecurityProcessor(這是一個演員),並等待所有的期貨完成。最後,我使用MergeHelper進行後期處理。該SecurityProcessor需要一個安全,做一些I/O和處理起來,回覆安全

val listOfFutures = new ListBuffer[Future[Security]]() 
    var portfolioResponse: Portfolio = _ 
    for (security <- portfolio.getSecurities.toList) { 
    val securityProcessor = actorOf[SecurityProcessor].start() 
    listOfFutures += (securityProcessor ? security) map { 
     _.asInstanceOf[Security] 
    } 
    } 
    val futures = Future.sequence(listOfFutures.toList) 
    futures.map { 
    listOfSecurities => 
     portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities) 
    }.get 

這是設計正確的,是有使用阿卡來實現這一共同問題的一個更好/更酷的方式?

回答

8
val futureResult = Future.sequence(
        portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] } 
       ) map { securities => MergeHelper.merge(portfolio, securities) } 
+0

真的喜歡這個建議和按預期方式工作,直到我不得不分割並添加一堆Eventhandler.info語句來調試問題:( –

+0

DEF調試[T](T:T):T = {事件處理程序.info(t); t} –

+0

akka is awesome !! –