2012-07-26 42 views
5

這是我的場景:Akka onReceive方法是否同時執行?

我有一個主演員,它接收來自多個孩子演員的消息。這些消息包含要彙總的數據。在這種聚合邏輯中,如果我使用共享數據結構來收集聚合,是否需要處理同步問題?

else if(arg0 instanceof ReducedMsg){ 

          ReducedMsg reduced = (ReducedMsg)arg0; 
     counter.decrementAndGet(); 

     synchronized(finalResult){ 

      finalResult.add((KeyValue<K, V>) reduced.getReduced()); 

      if(counter.get() == 0){ 
            if(checkAndReduce(finalResult)){ 

        finalResult.clear(); 
       } 
       else{ 
        stop(); 
        latch.countDown(); 
       } 

      } 

     } 



    } 

所以你可以看到我有一個finalResult,對每個信息將彙總和處理邏輯之後需求,以及要清除的集合。

其實我試圖實現的是一個遞歸(關聯)減少mapreduce。所以我需要保持我假設的同步塊?或者是否有可能Akka一次執行onReceive一個線程?

該邏輯在小數據集上產生準確和可預測的結果。我的問題是,當我的輸入數據集有點大,代碼掛起。我想確定這是因爲我的同步塊的上下文切換,所以我可能會遇到不同的設計。

回答

14

onReceive()從來沒有同時調用。這是Akka給你的最基本的保證。

這意味着,如果你的counter變量是一個領域的演員,沒有其他的代碼都可以直接訪問該字段,你可以放心地使用正常int/long代替AtomicInteger/AtomicLong。同樣在finalResult上同步也不是必須的,假設它是一個封裝並隱藏在actor中的字段。

最後,CountDownLatch的使用是可疑的。在Akka應用程序中,您不應該使用任何同步原語。參與者基本上是單線程的,所有通信(包括喚醒和傳遞數據)都應該通過消息傳遞來實現。

這是在文檔中的所有解釋:http://doc.akka.io/docs/akka/2.0.2/general/jmm.html#Actors_and_the_Java_Memory_Model

+0

謝謝托馬斯。你的第一行清除了我的許多疑問! Regd鎖的使用,我不得不這樣做,以提供一個客戶端接口,應該等待,直到演員處理完成。我的目標是開發一個java框架,內部使用Akka/Scala進行處理。 – 2012-07-26 16:47:51

+0

@sutanudalui:你可以同時調用actor *,這意味着Akka會等待一些臨時隊列的響應。無需手動執行此操作。請教關於'ask'(而不是'tell')消息模式的Akka docs。 – 2012-07-26 16:52:15

+0

好的。我會更深入一點。我有一個有N個奴隸演員的循環路由器。我打算做的是並行處理,然後累積結果。因此,接收到每個輸入的主演員將路由到其中一個從屬設備。在處理消息時,從屬設備將消息發回給需要聚合的主設備。這是我在考慮同步問題時的聚合階段。從提供的文檔鏈接中,我看到Akka無法保證(我沒有人能猜到!)共享內存,在我的情況下,「finalResult」將受到保護。我是否正確理解這一點? – 2012-07-26 17:01:34

相關問題