2012-06-21 88 views
12

我在Akka docs中讀到,關閉附近演員的變量是危險的。Akka演員,期貨和關閉

警告

在這種情況下,你需要仔細避免關閉在 包含演員的參考,即不呼籲 封閉男主角方法從匿名演員類中。這將使 中斷演員封裝,並可能引入同步錯誤 和競爭條件,因爲其他演員的代碼將被同時安排到 到封閉演員。

現在,我有兩個演員,其中一個從第二個角色請求某個東西,並對結果做了一些處理。在下面這個例子中,我已經放在一起,演員累加器從演員號碼發現者檢索號碼,並將它們相加,沿途報告總和。

這可以以至少兩種不同的方式,因爲這例如具有示出了兩個不同的接收函數( VS )來完成。兩者的區別在於A未關閉計數器變量;相反,它等待一個整數並總結它,而B創建一個未來關閉計數器並做了總和。如果我正確理解這是如何工作的,這發生在爲處理onSuccess而創建的匿名actor中。

import com.esotericsoftware.minlog.Log 

import akka.actor.{Actor, Props} 
import akka.pattern.{ask, pipe} 
import akka.util.Timeout 
import akka.util.duration._ 

case object Start 
case object Request 


object ActorTest { 
    var wake = 0 

    val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator") 
    val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator") 

    Log.info("ActorTest", "Starting !") 

    accRef ! Start 
} 

class Accumulator extends Actor { 
    var counter = 0 

    implicit val timeout = Timeout(5 seconds) 

    // A: WITHOUT CLOSURE 
    def receive = { 
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self 
    case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start 
    } 
    // B: WITH CLOSURE 
    def receive = { 
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess { 
     case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start 
    } 
    } 
} 

class NumberGenerator extends Actor { 
    val rand = new java.util.Random() 

    def receive = { 
    case Request => sender ! rand.nextInt(11)-5 
    } 
} 

在這種情況下使用閉包是絕對邪惡的嗎?當然,我可以使用AtomicInteger而不是Int,或者在某些使用netty的網絡場景中,在threadsafe頻道上發出寫入操作,但這不是我的觀點。

要問可笑的風險:有沒有對未來的一個的onSuccess方式執行的,而不是一個匿名的中層演員這個演員,確定在的情況下沒有接收功能?

編輯

說得比較清楚,我的問題是:有沒有辦法迫使一系列期貨在同一個線程中給定的演員來運行?

回答

5

的問題是,onSuccess是要在不同的線程比線程演員的receive是要在運行運行。你可以使用pipeTo方法,或者使用Agent。製作counterAtomicInteger可以解決問題,但它並不那麼幹淨 - 也就是說,它打破了Actor模型。

+1

建議使用代理的+1 – gsimard

5

實現這樣的設計,最簡單的方法是使用「發射後不管」的語義:

class Accumulator extends Actor { 
    private[this] var counter = 0 

    def receive = { 
    case Start => ActorTest.genRef ! Request 
    case x: Int => { 
     counter += x 
     Log.info("Accumulator", "counter = " + counter) 
     self ! Start 
    } 
    } 
} 

該解決方案是完全異步的,你不需要任何超時。

+0

是的,如果我放棄使用Futures,這是有效的。在我的例子中,鏈接期貨很容易成爲可能,如果它們以_pipeTo self_結尾,但不可能再用「即燃即用」語義。相反,我必須在累加器的接收函數中定義N箇中間消息,以保證代碼在該Actor的線程中運行。我想我可以再問一次,這次更清楚一點:是否有辦法迫使一系列期貨在與給定的演員相同的線程中運行? – gsimard

+0

爲什麼你需要保證'Accumulator' Actor總是在同一個線程中運行?這似乎違背了演員模型哲學。期貨也是如此:應該派出一個線程池,以最大化表現。如果他們都在同一個線程中運行,你只需要一個簡單的順序程序,並且你不再需要期貨... – paradigmatic

+0

實際上,它是否在同一個線程中運行並不重要,要求是更多的是它應該按順序運行,就像處理單個actor的消息一樣。這不違背演員模型,它是演員模型。 – gsimard