2013-12-19 119 views
17

在Akka中,我沒有使用onComplete來創建用?創建的未來響應,而是嘗試使用pipeTo,因爲它被認爲是首選模式。但是,當未來超時時,我似乎沒有收到任何Throwable或Failures。如果在使用pipeTo時發生超時,我應該期望在演員中獲得什麼?什麼時候拋出一個不同的異常?示例代碼:pipeTo在超時或其他故障時發送什麼消息?

class Simple(otherActor : ActorRef) extends Actor{ 
    def receive = { 
    case "some_msg" => { 
     val implicit timeout = Timeout(1 seconds) 
     val response = otherActor ? "hello" 
     response pipeTo self 
    } 

    // case ??? // How do I handle timeouts? 
    } 
} 

如果在超時發生時沒有消息自動發送,那麼我應該如何處理pipeTo超時?

回答

23

未來的失敗以包含異常的akka.actor.Status.Failure消息的形式發送。超時的例外是akka.pattern.AskTimeoutException

+0

是否有可能從超時消息中獲得原始消息? – hakunami

11

如果您的示例與您的實際代碼非常接近,那麼我不確定pipeTo是您想要的。把信息傳遞給你自己,對我來說,沒有太大意義,對於演員向另一個演員發送信息然後等待迴應的情況,有更好的解決方案。首先,我們來談談pipeTo。我認爲一個很好的例子是什麼時候使用pipeTo是,如果你有三個演員,A,B和C.A向B發送一條消息,然後B向C發送一條消息,而來自C的響應應該在B之後返回給A首先做一些其他事情。在這個例子中,你可以做這樣的事情裏面B的:

val fut = actorC ? someMessage 
fut map(someMapFunc) pipeTo sender 

這裏,pipeTo功能可以防止意外關閉在可變sender變種,如果你要改用類似onComplete和響應該回調中的sender。現在

,對於你的情況,如果你只是想談談B,然後等待B中的響應(和處理潛在的超時),你可以嘗試這樣的事:

class ActorA extends Actor{ 
    import context._ 
    val myB = context.actorOf(Props[ActorB]) 

    def receive = { 
    case msg => 
     myB ! msg 
     setReceiveTimeout(2 seconds) 
     become(waitingForResponse) 
    } 

    def waitingForResponse:Receive = { 
    case ReceiveTimeout => 
     println("got a receive timeout") 
     cancelReceiveTimeout 

    case response => 
     println("got my response back") 
     cancelReceiveTimeout 
    } 

    def cancelReceiveTimeout = setReceiveTimeout(Duration.Undefined) 
} 

在這例如,A以默認的receive部分函數開始。當它收到一條消息時,它會向B發送另一條消息,設置接收超時以接收來自B的響應,然後將它的功能切換爲特定於等待來自B的響應的東西。在那個新的接收函數中,我可以要麼及時得到B的回覆,要麼得到ReceiveTimeout,這表明我沒有及時得到我的回覆。無論哪種情況,我都會因爲重複而取消接收超時。

現在這是非常簡單的,但我只是試圖展示一種方式來做兩個演員之間來回,這似乎是你的例子顯示。

+0

這個答案很有幫助,pipeTo可能不適合我描述的情況,但我主要只是想知道在使用pipeTo時錯誤是如何處理的。 – mushroom

相關問題