我目前正在使用註冊過程處理應用程序。此註冊過程在某些時候將以異步方式與外部系統進行通信。爲了使這個問題簡潔明瞭,我顯示你,我已經寫了兩個重要角色:兒童演員,期貨和例外
SignupActor.scala
class SignupActor extends PersistentFSM[SignupActor.State, Data, DomainEvt] {
private val apiActor = context.actorOf(ExternalAPIActor.props(new HttpClient))
// At a certain point, a CreateUser(data) message is sent to the apiActor
}
ExternalAPIActor.scala
class ExternalAPIActor(apiClient: HttpClient) extends Actor {
override def preRestart(reason: Throwable, message: Option[Any]) = {
message.foreach(context.system.scheduler.scheduleOnce(3 seconds, self, _))
super.preRestart(reason, message)
}
def receive: Receive = {
case CreateUser(data) =>
Await.result(
apiClient.post(data)
.map(_ => UserCreatedInAPI())
.pipeTo(context.parent),
Timeout(5 seconds).duration
)
}
}
這設置似乎按預期工作。當外部API出現問題(例如超時或網絡問題)時,由HttpClient::post
返回的Future
將失敗,並且將由於Await.result
而導致例外。這又歸功於SignupActor
親代演員的SupervisorStrategy
,它將重新啓動ExternalAPIActor
,在那裏我們可以用最小的延遲重新發送最後一條消息,以避免死鎖。
我看到一對夫婦的問題與此設置:
- 內
ExternalAPIActor
的receive
方法,發生阻塞。據我所知,在演員陣營內被視爲反模式。 - 用於重新發送消息的延遲是靜態的。如果API長時間不可用,我們將每3秒繼續發送一次HTTP請求。我想在這裏採用某種指數退避機制。
要繼續對後者,我試過在SignupActor
如下:
SignupActor.scala
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
ExternalAPIActor.props(new HttpClient),
childName = "external-api",
minBackoff = 3 seconds,
maxBackoff = 30 seconds,
randomFactor = 0.2
)
)
private val apiActor = context.actorOf(supervisor)
不幸的是,這似乎並沒有做任何事情根本不調用ExternalAPIActor
的preRestart
方法。當用Backoff.onStop
代替Backoff.onFailure
時,preRestart
方法被調用,但根本沒有任何種類的指數退避。
鑑於上述情況,我的問題如下:
- 是使用
Await.result
推薦的(唯一?)的方式,使在Future
拋出肯定異常,由於所謂的演員中的服務返回被發現並小心處理?我的特殊用例中一個特別重要的部分是,當出現錯誤時不應該丟棄消息,而是重試消息。或者是否有其他(慣用)的方法在異步環境中拋出的異常應該在Actors中處理? - 在這種情況下,如何使用
BackoffSupervisor
?再次說明:負責異常的消息不會丟失是非常重要的,但重試直到N次(由SupervisorStrategy
的maxRetries
參數確定。
我喜歡這個標題。 – Beta