我目前正在使用Akka actors,streaming和http的組合來處理客戶端告訴服務器想要接收事件的速度有多快,並且服務器發送客戶端事件的速度。停止Akka演員在WebSocket斷開連接或空閒時間後
這裏的客戶端消息到服務器:
{
"uuid": "d30711c6-6bbf-4f11-9471-638ef7e19dfd",
"startTime": "1501254050",
"rate": "1.5"
}
這是由看起來像這樣一個途徑接受:
path("ws") { // /ws path allows websocket connections
get {
extractUpgradeToWebSocket { upgrade =>
complete(upgrade.handleMessagesWithSinkSource(replaySink, Source.maybe))
}
}
}
的replaySink
只是解析JSON成一個案例類,它發送給監督演員使用:
val replaySupervisor = system.actorOf(ReplaySupervisor.props(), "replay-supervisor")
val replaySink: Sink[Message, NotUsed] = Flow[Message]
.map{
case tm: TextMessage.Strict =>
val msgJson = parse(tm.getStrictText).getOrElse(Json.Null)
msgJson.as[ReplayRequest].getOrElse("JSONParsingError")
case _ => log.error("Incoming message not in proper format")
}
.to(Sink.actorRefWithAck(
ref = replaySupervisor,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = ConnectionClosed
)
)
該ConnectionClosed
消息僅在WebSocket連接超時後纔會發送,因爲它實際上是一個無限制的源。
當主管接收到消息時,它創建新的子actor,給它提供客戶端提供的UUID,然後傳遞消息。或者如果孩子已經存在(因爲客戶端必須通過定期發送相同的事件來保持連接打開,或者因爲它改變了開始時間/速率併發送新消息),它只是將它傳遞給它。
我的問題是,一旦連接關閉,我該如何告訴孩子演員停止?上面的接收器不知道哪個小孩最終收到了它的消息,所以我不能發送帶有UUID的ConnectionClosed消息給主管轉發,告訴小孩演員停止自己。發送PoisonPill只會殺死主管。我能想到的最好方法是使用設置孩子的ReceiveTimeout屬性在一段時間內沒有收到事件後自行停止,並讓客戶端發送常規消息。
是否有一種更優雅的方式可以阻止連接關閉時刻的演員?
你是什麼意思,「如果孩子已經存在」?在連接期間,UUID是否可以出現在多條消息中? – chunjef
是的,當連接打開時,客戶端必須定期發送帶有其UUID的消息以保持連接處於活動狀態。客戶端也可以在連接打開時更改開始時間/速率,因此也會在這種情況下傳遞。 – Brideau
因此,當一個特定的連接打開時,主管將看到多個消息中的uuid1和多個消息中的uuid2。一個孩子處理所有'uuid1'消息,另一個孩子處理所有'uuid2'消息。我的理解是否正確? – chunjef