2017-06-19 26 views
1

我想了解如何在操作中使用有狀態的演員時處理異步調用。如何確保在Akka中使用期貨時的消息一致性

考慮下面的演員:

@Singleton 
class MyActor @Inject()() extends Actor with LazyLogging { 
    import context.dispatcher 
    override def receive: Receive = { 
    case Test(id: String) => 
     Future { logger.debug(s"id [$id]") } 
    } 
} 

和調用該演員:

Stream.range(1, 100).foreach { i => 
    MyActor ! Test(i.toString) 
} 

這會給我的一系列不一致的印刷。

我該如何在演員中使用未來,而不會丟失整個「一個接一個」的功能?

+0

如果你在'Future'部分前面加上'println(s「received $ id)'',你應該看到id以正確的順序到達,對吧? – mfirry

回答

2

您應該將該Future存儲在var中,然後在每個下一條消息中進行flatMap調用。

if(storedFut == null) storedFut = Future { logger.debug(s"id [$id]") } 
else storedFut = storedFut.flatMap(_ => Future { logger.debug(s"id [$id]") }) 

flatMap正好適用於期貨定單。

旁註

如果你想的事情並行發生你在不確定的區域,在那裏你不能強行訂購

+0

不太確定這是天才還是非常糟糕 – Gleeb

+0

這實際上取決於你想達到什麼。期貨是異步的,並且執行器上的計劃可以花費任意長的時間,所以如果你不'flatMap'你鬆散的訂單。在大多數情況下,我已經看到'Future'計算一些值,然後這個值被髮回給原來的演員或其他人。在這種情況下,你可以使用'pipeTo'。當你發回它時,由你的演員來保持排序,這樣你就可以舉行一個全球計數器,併爲每個消息增加它,並附加到未來使用'地圖'例如。 – almendar

+0

從這件事情中發現了一些內存泄漏的氣味,那條鏈上的所有已完工未來將會發生什麼? – Gleeb

-1

另一種方法是將自身發送的消息上Future.onComplete假設有處理的順序沒有限制

//in receive 
    val future = Future { logger.debug(s"id [$id]") } 
    f.onComplete { 
     case Success(value) => self ! TestResult(s"Got the callback, meaning = $value") 
     case Failure(e) => self ! TestError(e) 
     } 
+1

因爲期貨不會按任何訂單完成,TestResults也不會被定購 – Gleeb

2

什麼你觀察並不違反阿卡的"message ordering per sender–receiver pair"擔保;您正在觀察的是Future的非確定性性質,正如@almendar在他或她的回答中提到的那樣。

Test消息被髮送到MyActor順序,從Test("1")Test("100"),和MyActor在相同的順序其receive塊處理每個消息。但是,您將每個郵件記錄在Future之內,並且這些Futures的完成順序不確定。這就是爲什麼你會看到「系列打印不一致」。

要獲得消息的順序記錄所需的行爲,請不要將記錄包裝在Future中。如果您必須在演員的receive區塊內使用Future,則@ almendar在演員內部使用var的方法是安全的。