2017-02-21 178 views
2

在我的項目中,我使用Akka的演員。通過定義演員是線程安全的,這意味着,在演員的接收方法Akka的演員異步消息處理

def receive = { 
    case msg => 
     // some logic here 
} 

一次僅一個線程處理的註釋一段代碼。然而,事情開始變得更加複雜時,這種代碼是異步的:

def receive = { 
    case msg => 
     Future { 
      // some logic here 
     } 
} 

如果我理解這個正確的,在這種情況下,只有未來的結構將同步,可以這麼說,而不是內部邏輯未來。

當然,我可能會阻止未來:

def receive = { 
    case msg => 
     val future = Future { 
      // some logic here 
     } 
     Await.result(future, 10.seconds) 
} 

解決了這個問題,但我認爲我們都應該同意,這是難以接受的解決方案。

所以這是我的問題:如何在異步計算的情況下保留演員的線程安全特性沒有阻止Scala的未來?

+0

在http://docs.scala-lang.org/overviews/core/futures.html描述只需添加一個回調到未來。回叫應該向這個或另一個演員發送消息。 –

回答

5

我如何可以保留 異步計算的情況下,演員的線程安全的特性,而不塊Scalas Future

這個假設只有在你修改Future中的actor的內部狀態時纔是正確的,這似乎是一種設計氣味。只有通過創建數據和管道的副本導致的計算使用pipeTo演員的使用​​未來的計算。一旦演員收到你可以在上面安全地操作計算的結果是:

import akka.pattern.pipe 

case class ComputationResult(s: String) 

def receive = { 
    case ComputationResult(s) => // modify internal state here 
    case msg => 
    Future { 
     // Compute here, don't modify state 
     ComputationResult("finished computing") 
    }.pipeTo(self) 
} 
+0

實際上Future內部的邏輯涉及到Mongo數據庫。我們正在使用ReactiveMongo,它是異步的(因此描述的問題)。所以我必須確保只有在處理完上一條消息後才處理下一條消息,這意味着應該已經解析了對數據庫的異步調用。 –

+0

@SergeyVolkov如果這是問題,那麼這完全與Akka actors沒有關係,這是爲了讓你的數據庫在處理下一個數據庫之前完成它的事務。在這種情況下,您可以實現一個阻塞隊列邏輯,它只在完成時處理隊列中的下一個項目。 –

0

我想你需要先「解決」的分貝查詢,然後根據結果返回一個新Future。如果數據庫查詢返回Future[A],那麼你可以使用flatMapA操作,並返回一個新Future。東西的

def receive = { 
    case msg => 
     val futureResult: Future[Result] = ... 
     futureResult.flatMap { result: Result => 
      // .... 
      // return a new Future 
     } 
} 
0

這裏簡單的解決方案的線路是把演員進入狀態機(使用AkkaFSM)並執行以下操作:

  • 分派未來MongoDB的請求。
  • 使用對你自己的演員的引用來與你的演員交流
  • 從未來告訴消息。

根據上下文,你可能需要做一些更加得到適當的反應。

但是,這裏有你處理與演員的狀態消息的優勢,請你爲你自己的線程,你可以變異的演員狀態。