2015-04-22 55 views
7

我正在使用Spray應用程序中的ask模式調用Actor,並將結果作爲HTTP響應返回。我將演員的失敗映射到自定義錯誤代碼。在發生故障時解決Akka期貨問題

val authActor = context.actorOf(Props[AuthenticationActor]) 

callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user => 
    complete(StatusCodes.OK, user) 
} 

def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = { 
onComplete(f) { 
    case Success(value: T) => cb(value) 
    case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage) 
    case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.") 
    //In reality this returns a custom error object. 
} 
} 

這正常工作時authActor發送故障,但如果authActor拋出一個異常,沒有任何反應,直到問超時完成。例如:

override def receive: Receive = { 
    case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token") 
} 

我知道阿卡文檔說

不同,需要發送一個失敗消息發送者例外完成未來。 這不會自動完成當一個actor在處理消息時拋出一個異常。

但是鑑於我使用了很多的噴射佈線演員和服務演員之間的接口,我寧願不用每個孩子演員的try/catch包裝接收部分。是否有更好的方法來實現自動處理子actor中的異常,並在發生異常時立即解決未來?

編輯:這是我目前的解決方案。然而,爲每個孩子演員做這件事都很麻煩。

override def receive: Receive = { 
case default => 
    try { 
    default match { 
     case _ => throw new ServiceException("")//Actual code would go here 
    } 
    } 
    catch { 
    case se: ServiceException => 
     logger.error("Service error raised:", se) 
     sender ! Failure(se) 
    case ex: Exception => 
     sender ! Failure(ex) 
     throw ex 
    } 
} 

這樣,如果它是一個預期的錯誤(即ServiceException),它通過創建失敗來處理。如果它是意外的,它會立即返回一個失敗,以便未來得到解決,但是會拋出異常,因此它仍然可以由SupervisorStrategy處理。

+0

那麼......拋出異常之前發送失敗消息。 –

+0

這就是我不想做的 - 我說**我寧願不用每個孩子演員的try/catch **包裝接收部分。這是一個玩具的例子,我完全有可能不控制拋出異常的位置。 –

+0

呃......你知道...... resilent分佈式系統的基本路線之一就是「明確地製造錯誤」。想想可能發生的各種錯誤...讓他們明確。如果你可以有「TypeSafe」錯誤...更好。 –

回答

13

如果你想要的方式來提供自動在意外的異常的情況下返回給發送者的響應發送,那麼這樣的事情可能會爲你工作:

trait FailurePropatingActor extends Actor{ 
    override def preRestart(reason:Throwable, message:Option[Any]){ 
    super.preRestart(reason, message) 
    sender() ! Status.Failure(reason) 
    } 
} 

我們覆蓋preRestart和傳播失敗作爲Status.Failure返回發件人,這將導致上游Future失敗。另外,在這裏撥打super.preRestart這很重要,因爲這是孩子停止發生的地方。在演員使用,這看起來是這樣的:

case class GetElement(list:List[Int], index:Int) 
class MySimpleActor extends FailurePropatingActor { 
    def receive = { 
    case GetElement(list, i) => 
     val result = list(i) 
     sender() ! result 
    } 
} 

如果我打電話給這個演員的一個實例,像這樣:

import akka.pattern.ask 
import concurrent.duration._ 

val system = ActorSystem("test") 
import system.dispatcher 
implicit val timeout = Timeout(2 seconds) 
val ref = system.actorOf(Props[MySimpleActor]) 
val fut = ref ? GetElement(List(1,2,3), 6) 

fut onComplete{ 
    case util.Success(result) => 
    println(s"success: $result") 

    case util.Failure(ex) => 
    println(s"FAIL: ${ex.getMessage}") 
    ex.printStackTrace()  
}  

然後它會正常打我Failure塊。現在,當Futures不參與擴展該特質的演員時,該基本特徵中的代碼運行良好,就像這裏的簡單演員。但是如果您使用Future s,則需要小心,因爲在Future中發生的例外情況不會導致演員重新啓動,並且在preRestart中,sender()的呼叫也不會返回正確的參考,因爲演員已經進入下一條消息。像這樣的演員表明問題:

class MyBadFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 

    def receive = { 
    case GetElement(list, i) => 
     val orig = sender() 
     val fut = Future{ 
     val result = list(i) 
     orig ! result 
     }  
    } 
} 

如果我們在前面的測試代碼中使用這個演員,我們總是在失敗的情況下超時。爲了減輕這一點,你需要管期貨的結果返回給像這樣發件人:

class MyGoodFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut pipeTo sender() 
    } 
} 

在這種特殊情況下,演員本身不會重新啓動,因爲它並沒有遇到未捕獲的異常。現在,如果你的演員所需要的未來後做一些額外的處理,你可以管回自我,明確當你得到一個Status.Failure失敗:

class MyGoodFutureUsingActor extends FailurePropatingActor{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut.to(self, sender()) 

    case d:Double => 
     sender() ! d * 2 

    case Status.Failure(ex) => 
     throw ex 
    } 
} 

如果這種行爲變得普遍,你可以把它提供給任何演員需要它是這樣的:

trait StatusFailureHandling{ me:Actor => 
    def failureHandling:Receive = { 
    case Status.Failure(ex) => 
     throw ex  
    } 
} 

class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{ 
    import context.dispatcher 
    import akka.pattern.pipe 

    def receive = myReceive orElse failureHandling 

    def myReceive:Receive = { 
    case GetElement(list, i) => 
     val fut = Future{ 
     list(i) 
     } 

     fut.to(self, sender()) 

    case d:Double => 
     sender() ! d * 2   
    } 
} 
+1

這看起來不錯,謝謝! –