2017-01-30 146 views
1

下面的演員表演一次,分子和分母都收到一個部門,阿卡演員的消息聚集

package funnelTest 

import akka.actor.{Actor, ActorSystem, Props} 

object Main extends App { 

    val system1 = ActorSystem("funnelTest") 
    val input = system1.actorOf(Props[Funnel], "input") 

    input ! 3 
    input ! 2.718 

} 

case object Run 

class Funnel extends Actor { 

    var i: Option[Int] = None 
    var d: Option[Double] = None 

    def isReady = i.nonEmpty && d.nonEmpty 

    def receive = { 
    case v: Int => i = Some(v) ; if (isReady) self ! Run 
    case v: Double => d = Some(v) ; if (isReady) self ! Run 
    case Run  => println(s"aggregated, $d/$i = " + d.get/i.get) 
    case _   => 
    } 
} 

有聚合所有的消息更可擴展的方式?

+0

爲什麼你認爲你的實現是不可擴展的? (它可以稍微改進,用一個新的方法run()來替換「!Run」,這個方法做了現在的「case Run」)。 –

回答

1

標識請求的唯一標識符是解決問題的一種方法。演員內的地圖(calcRegistry)擁有先前到達的FractionComponentNumeratorDenominator)。一旦第二部分進入,我們就可以開始按照你已經完成的計算來運行計算。

該實現仍然沒有解決內存泄漏的問題,其中第二對將不會被接收,並且地圖將繼續增長。

import akka.actor.{Actor, ActorSystem, Props} 

object Main extends App { 

    import Funnel._ 

    val system1 = ActorSystem("funnelTest") 
    val input = system1.actorOf(Props[Funnel], "input") 

    (1 to 10) foreach { number => 

    val id = java.util.UUID.randomUUID().toString 
    input ! Numerator(id, value = number + 2) 
    input ! Denominator(id, value = number + 1) 
    } 

    system1.awaitTermination() 

} 

class Funnel extends Actor { 

    import Funnel._ 
    import scala.collection._ 

    val calcRegistry = mutable.Map[String, FractionComponent]() 

    def saveToRegistry(comp: FractionComponent) = calcRegistry(comp.id) = comp 

    def printValue(num: Numerator, den: Denominator) = println(s"aggregated, ${num.value}/${den.value} = ${num.value/den.value}") 

    def receive = { 
    case [email protected](id, _) => 
     if (calcRegistry contains id) 
     self ! Run(num, calcRegistry(id).asInstanceOf[Denominator]) 
     else saveToRegistry(num) 
    case [email protected](id, _) => 
     if (calcRegistry contains id) 
     self ! Run(calcRegistry(id).asInstanceOf[Numerator], den) 
     else saveToRegistry(den) 
    case Run(num: Numerator, den: Denominator) => 
     calcRegistry.remove(num.id) 
     printValue(num, den) 
    case _ => 
    } 
} 

object Funnel { 

    sealed trait FractionComponent { 
    def id: String 
    } 

    case class Numerator(override val id: String, value: Double) extends FractionComponent 

    case class Denominator(override val id: String, value: Integer) extends FractionComponent 

    case class Run(num: Numerator, denominator: Denominator) 

} 

輸出示例:

aggregated, 3.0/2 = 1.5 aggregated, 4.0/3 = 1.3333333333333333 aggregated, 5.0/4 = 1.25 aggregated, 6.0/5 = 1.2 aggregated, 7.0/6 = 1.1666666666666667 aggregated, 8.0/7 = 1.1428571428571428 aggregated, 9.0/8 = 1.125 aggregated, 10.0/9 = 1.1111111111111112 aggregated, 11.0/10 = 1.1 aggregated, 12.0/11 = 1.0909090909090908

參考:Reactive Messaging Patterns with the Actor Model