2017-10-09 117 views
1

我目前正在使用註冊過程處理應用程序。此註冊過程在某些時候將以異步方式與外部系統進行通信。爲了使這個問題簡潔明瞭,我顯示你,我已經寫了兩個重要角色:兒童演員,期貨和例外

SignupActor.scala

class SignupActor extends PersistentFSM[SignupActor.State, Data, DomainEvt] { 
    private val apiActor = context.actorOf(ExternalAPIActor.props(new HttpClient)) 

    // At a certain point, a CreateUser(data) message is sent to the apiActor 
} 

ExternalAPIActor.scala

class ExternalAPIActor(apiClient: HttpClient) extends Actor { 
    override def preRestart(reason: Throwable, message: Option[Any]) = { 
     message.foreach(context.system.scheduler.scheduleOnce(3 seconds, self, _)) 
     super.preRestart(reason, message) 
    } 

    def receive: Receive = { 
     case CreateUser(data) => 
      Await.result(
       apiClient.post(data) 
        .map(_ => UserCreatedInAPI()) 
        .pipeTo(context.parent), 
       Timeout(5 seconds).duration 
      ) 
    } 
} 

這設置似乎按預期工作。當外部API出現問題(例如超時或網絡問題)時,由HttpClient::post返回的Future將失敗,並且將由於Await.result而導致例外。這又歸功於SignupActor親代演員的SupervisorStrategy,它將重新啓動ExternalAPIActor,在那裏我們可以用最小的延遲重新發送最後一條消息,以避免死鎖。

我看到一對夫婦的問題與此設置:

  • ExternalAPIActorreceive方法,發生阻塞。據我所知,在演員陣營內被視爲反模式。
  • 用於重新發送消息的延遲是靜態的。如果API長時間不可用,我們將每3秒繼續發送一次HTTP請求。我想在這裏採用某種指數退避機制。

要繼續對後者,我試過在SignupActor如下:

SignupActor.scala

val supervisor = BackoffSupervisor.props(
    Backoff.onFailure(
     ExternalAPIActor.props(new HttpClient), 
     childName = "external-api", 
     minBackoff = 3 seconds, 
     maxBackoff = 30 seconds, 
     randomFactor = 0.2 
    ) 
) 

private val apiActor = context.actorOf(supervisor) 

不幸的是,這似乎並沒有做任何事情根本不調用ExternalAPIActorpreRestart方法。當用Backoff.onStop代替Backoff.onFailure時,preRestart方法被調用,但根本沒有任何種類的指數退避。

鑑於上述情況,我的問題如下:

  • 是使用Await.result推薦的(唯一?)的方式,使在Future拋出肯定異常,由於所謂的演員中的服務返回被發現並小心處理?我的特殊用例中一個特別重要的部分是,當出現錯誤時不應該丟棄消息,而是重試消息。或者是否有其他(慣用)的方法在異步環境中拋出的異常應該在Actors中處理?
  • 在這種情況下,如何使用BackoffSupervisor?再次說明:負責異常的消息不會丟失是非常重要的,但重試直到N次(由SupervisorStrategymaxRetries參數確定。
+0

我喜歡這個標題。 – Beta

回答

3

是使用Await.result推薦的(唯一?)的方式,以確保 例外在將來從名爲 演員中的服務返回被抓獲和相應的處理拋出?

不是。通常情況下,您不想在阿卡處理故障。更好的選擇是管道故障,自己的演員,避免了需要使用Await.result都:

def receive: Receive = { 
    case CreateUser(data) => 
    apiClient.post(data) 
     .map(_ => UserCreatedInAPI()) 
     .pipeTo(self) 
    case Success(res) => context.parent ! res 
    case Failure(e) => // Invoke retry here 
} 

這意味着沒有需要重新啓動才能處理失敗,他們的正常流動的所有部分你演員。

處理此問題的另一種方法可以是創建「受監管的未來」。從this blog post摘自:

object SupervisedPipe { 

    case class SupervisedFailure(ex: Throwable) 
    class SupervisedPipeableFuture[T](future: Future[T])(implicit executionContext: ExecutionContext) { 
    // implicit failure recipient goes to self when used inside an actor 
    def supervisedPipeTo(successRecipient: ActorRef)(implicit failureRecipient: ActorRef): Unit = 
     future.andThen { 
     case Success(result) => successRecipient ! result 
     case Failure(ex) => failureRecipient ! SupervisedFailure(ex) 
     } 
    } 

    implicit def supervisedPipeTo[T](future: Future[T])(implicit executionContext: ExecutionContext): SupervisedPipeableFuture[T] = 
    new SupervisedPipeableFuture[T](future) 

    /* `orElse` with the actor receive logic */ 
    val handleSupervisedFailure: Receive = { 
    // just throw the exception and make the actor logic handle it 
    case SupervisedFailure(ex) => throw ex 
    } 

    def supervised(receive: Receive): Receive = 
    handleSupervisedFailure orElse receive 
} 

這樣,你只能管自己,一旦你得到一個Failure,否則將其發送到演員本來是郵件被髮送到,避免了我加入case Success需要receive方法。您所需要做的就是將supervisedPipeTo替換爲pipeTo提供的原始框架。

+0

感謝您的回答Yuval。將結果重新映射回演員本身看起來就像沿着正確的道路走下去。然而,在你的「受監管的未來」例子中,是否會丟失對「SupervisedFailure」消息負責的原始消息? –

+1

在第一個代碼片段的'receive'方法中,'pipeTo(self)'將'Future'的_result_發送給'self',所以你應該在'UserCreatedInAPI'和'akka.actor.Status上進行模式匹配。失敗「而不是」成功「和」失敗「。 – chunjef

0

好吧,我已經做了一些更多的思考和修補,我已經提出了以下幾點。

ExternalAPIActor.scala

class ExternalAPIActor(apiClient: HttpClient) extends Actor with Stash { 
     import ExternalAPIActor._ 

     def receive: Receive = { 
      case msg @ CreateUser(data) => 
       context.become(waitingForExternalServiceReceive(msg)) 
       apiClient.post(data) 
        .map(_ => UserCreatedInAPI()) 
        .pipeTo(self) 
     } 

     def waitingForExternalServiceReceive(event: InputEvent): Receive = LoggingReceive { 
      case Failure(_) => 
       unstashAll() 
       context.unbecome() 
       context.system.scheduler.scheduleOnce(3 seconds, self, event) 

      case msg:OutputEvent => 
       unstashAll() 
       context.unbecome() 
       context.parent ! msg 

      case _ => stash() 
     } 
} 

object ExternalAPIActor { 
    sealed trait InputEvent 
    sealed trait OutputEvent 

    final case class CreateUser(data: Map[String,Any]) extends InputEvent 
    final case class UserCreatedInAPI() extends OutputEvent 
} 

我已經使用這種技術來防止萬一丟失原始郵件有什麼問題我們調用外部服務。在請求到外部服務的過程中,我切換上下文,等待出現故障的響應,然後再切換回來。由於Stash特質,我可以確保其他對外部服務的請求不會丟失。

因爲我有我的應用程序調用外部服務的多個演員,我中提取出waitingForExternalServiceReceive自身的特點:

WaitingForExternalService.scala現在

trait WaitingForExternalServiceReceive[-tInput, +tOutput] extends Stash { 

    def waitingForExternalServiceReceive(event: tInput)(implicit ec: ExecutionContext): Receive = LoggingReceive { 
    case akka.actor.Status.Failure(_) => 
     unstashAll() 
     context.unbecome() 
     context.system.scheduler.scheduleOnce(3 seconds, self, event) 

    case msg:tOutput => 
     unstashAll() 
     context.unbecome() 
     context.parent ! msg 

    case _ => stash() 
    } 
} 

,該ExternalAPIActor可以延長這個特質:

ExternalAPiactor.scala

class ExternalAPIActor(apiClient: HttpClient) extends Actor with WaitingForExternalServiceReceive[InputEvent,OutputEvent] { 
     import ExternalAPIActor._ 

     def receive: Receive = { 
      case msg @ CreateUser(data) => 
       context.become(waitingForExternalServiceReceive(msg)) 
       apiClient.post(data) 
        .map(_ => UserCreatedInAPI()) 
        .pipeTo(self) 
     } 
} 

object ExternalAPIActor { 
    sealed trait InputEvent 
    sealed trait OutputEvent 

    final case class CreateUser(data: Map[String,Any]) extends InputEvent 
    final case class UserCreatedInAPI() extends OutputEvent 
} 

現在,如果發生故障/錯誤並且消息沒有丟失,演員將不會重新啓動。更重要的是,演員現在的整個流程都是非阻塞的。

此設置(最有可能)遠非完美,但它似乎完全按照我需要的方式工作。