2014-03-31 57 views
5

等待Akka中多個演員的結果的正確方法是什麼?在Akka中等待多個結果

Principles of Reactive Programming Coursera課程有一個複製鍵值存儲的練習。沒有深入分析任務的細節,它需要等待多個參與者的確認,然後才能指示覆制已完成。

我使用包含未完成請求的可變映射來執行分配,但是我覺得解決方案有'難聞的氣味'。我希望有更好的方法來實現看起來像一個常見的情況。

爲了通過阻止我的解決方案來堅持課程的榮譽代碼,我有一個抽象用例來描述類似的問題。

發票行項目需要計算其納稅義務。納稅義務是跨多個稅務機關(例如聯邦,州,警區)應用於該行項目的所有稅項的組合。如果每個稅收管理機構都是能夠確定該行項目的納稅義務的行爲人,則該行項目需要所有行爲主體進行報告,然後才能繼續報告整體納稅義務。在Akka中完成這種情況的最佳方式是什麼?

+0

考慮將稅收管理機構作爲對象而不是演員,並將行項目的納稅責任計算爲未來,如http://danielwestheide.com/blog/2013/01/09/the-新手指南-scala-part-8-welcome-to-the-future.html –

+0

您是否需要爲每個訂單項使用Actor? [Akka docs](http://doc.akka.io/docs/akka/2.3.1/scala/futures.html#Use_Directly)建議爲這種情況評估「未來」。 – tariksbl

回答

0

這是阿卡的一個非常普遍的問題。你有多個演員會爲你做這個工作,你需要把它們結合起來。

Jammie Allen在其着作「Effective Akka」中提出的解決方案(關於從各種賬戶中獲取銀行賬戶餘額)是你派生出一個能派生多個演員來完成這項工作的演員(例如計算你稅)。它會等待所有人回答。

一個捕獲,你不應該使用ask但登記tell

當您產生多個參與者(例如FederalTaxactor,StateTaxActor ...)時,您會向他們發送一條消息並提供他們需要處理的數據。然後你知道你需要收集多少個答案。隨着每一個迴應你檢查是否所有的答覆都在那裏。如果沒有,請等待。

問題是,如果任何演員失敗,您可能會永遠等待。所以你給自己安排一個超時信息。如果不是所有的答案都在那裏,那麼您返回的操作沒有成功完成。

Akka有一個特殊的工具來安排一個超時給自己作爲一個好幫手。

3

這是我相信你在找的一個簡單的例子。它顯示了像演員這樣的主人如何產生一些童工,然後等待他們的所有反應,處理可能發生超時等待結果的情況。該解決方案顯示瞭如何等待初始請求,然後在等待響應時切換到新的接收函數。它還展示瞭如何將狀態傳播到等待接收函數中,以避免在實例級別具有顯式的可變狀態。

object TaxCalculator { 
    sealed trait TaxType 
    case object StateTax extends TaxType 
    case object FederalTax extends TaxType 
    case object PoliceDistrictTax extends TaxType 
    val AllTaxTypes:Set[TaxType] = Set(StateTax, FederalTax, PoliceDistrictTax) 

    case class GetTaxAmount(grossEarnings:Double) 
    case class TaxResult(taxType:TaxType, amount:Double) 

    case class TotalTaxResult(taxAmount:Double) 
    case object TaxCalculationTimeout 
} 

class TaxCalculator extends Actor{ 
import TaxCalculator._ 
import context._ 
import concurrent.duration._ 

    def receive = waitingForRequest 

    def waitingForRequest:Receive = { 
    case gta:GetTaxAmount => 
     val children = AllTaxTypes map (tt => actorOf(propsFor(tt))) 
     children foreach (_ ! gta) 
     setReceiveTimeout(2 seconds) 
     become(waitingForResponses(sender, AllTaxTypes)) 
    } 

    def waitingForResponses(respondTo:ActorRef, expectedTypes:Set[TaxType], taxes:Map[TaxType, Double] = Map.empty):Receive = { 
    case TaxResult(tt, amount) => 
     val newTaxes = taxes ++ Map(tt -> amount) 
     if (newTaxes.keySet == expectedTypes){ 
     respondTo ! TotalTaxResult(newTaxes.values.foldLeft(0.0)(_+_)) 
     context stop self 
     } 
     else{ 
     become(waitingForResponses(respondTo, expectedTypes, newTaxes)) 
     } 

    case ReceiveTimeout => 
     respondTo ! TaxCalculationTimeout 
     context stop self 
    } 

    def propsFor(taxType:TaxType) = taxType match{ 
    case StateTax => Props[StateTaxCalculator] 
    case FederalTax => Props[FederalTaxCalculator] 
    case PoliceDistrictTax => Props[PoliceDistrictTaxCalculator] 
    } 
} 

trait TaxCalculatingActor extends Actor{ 
    import TaxCalculator._ 
    val taxType:TaxType 
    val percentage:Double 

    def receive = { 
    case GetTaxAmount(earnings) => 
     val tax = earnings * percentage 
     sender ! TaxResult(taxType, tax) 
    } 
} 

class FederalTaxCalculator extends TaxCalculatingActor{ 
    val taxType = TaxCalculator.FederalTax 
    val percentage = 0.20 
} 

class StateTaxCalculator extends TaxCalculatingActor{ 
    val taxType = TaxCalculator.StateTax 
    val percentage = 0.10 
} 

class PoliceDistrictTaxCalculator extends TaxCalculatingActor{ 
    val taxType = TaxCalculator.PoliceDistrictTax 
    val percentage = 0.05 
} 

然後,你可以用下面的代碼測試了這一點:

import TaxCalculator._ 
import akka.pattern.ask 
import concurrent.duration._ 
implicit val timeout = Timeout(5 seconds) 

val system = ActorSystem("taxes") 
import system._ 
val cal = system.actorOf(Props[TaxCalculator]) 
val fut = cal ? GetTaxAmount(1000.00) 
fut onComplete{ 
    case util.Success(TotalTaxResult(amount)) => 
    println(s"Got tax total of $amount") 
    case util.Success(TaxCalculationTimeout) => 
    println("Got timeout calculating tax") 
    case util.Failure(ex) => 
    println(s"Got exception calculating tax: ${ex.getMessage}") 
} 
0

正如答覆中建議,你可能會發現撰寫期貨有幫助在這種情況下的能力之前 - 期貨最好的說明(和承諾,這是有點相關的)我知道的是在這裏:http://docs.scala-lang.org/overviews/core/futures.html

這可能有助於解釋組合期貨可以回答需要,可能比演員更乾淨,或與演員結合的方式。