2017-07-28 74 views
0

我目前正在使用Akka actors,streaming和http的組合來處理客戶端告訴服務器想要接收事件的速度有多快,並且服務器發送客戶端事件的速度。停止Akka演員在WebSocket斷開連接或空閒時間後

這裏的客戶端消息到服務器:

{ 
"uuid": "d30711c6-6bbf-4f11-9471-638ef7e19dfd", 
"startTime": "1501254050", 
"rate": "1.5" 
} 

這是由看起來像這樣一個途徑接受:

path("ws") { // /ws path allows websocket connections 
    get { 
     extractUpgradeToWebSocket { upgrade => 
      complete(upgrade.handleMessagesWithSinkSource(replaySink, Source.maybe)) 
     } 
    } 
    } 

replaySink只是解析JSON成一個案例類,它發送給監督演員使用:

val replaySupervisor = system.actorOf(ReplaySupervisor.props(), "replay-supervisor") 
val replaySink: Sink[Message, NotUsed] = Flow[Message] 
    .map{ 
     case tm: TextMessage.Strict => 
     val msgJson = parse(tm.getStrictText).getOrElse(Json.Null) 
     msgJson.as[ReplayRequest].getOrElse("JSONParsingError") 
     case _ => log.error("Incoming message not in proper format") 
    } 
    .to(Sink.actorRefWithAck(
     ref = replaySupervisor, 
     onInitMessage = InitMessage, 
     ackMessage = AckMessage, 
     onCompleteMessage = ConnectionClosed 
     ) 
    ) 

ConnectionClosed消息僅在WebSocket連接超時後纔會發送,因爲它實際上是一個無限制的源。

當主管接收到消息時,它創建新的子actor,給它提供客戶端提供的UUID,然後傳遞消息。或者如果孩子已經存在(因爲客戶端必須通過定期發送相同的事件來保持連接打開,或者因爲它改變了開始時間/速率併發送新消息),它只是將它傳遞給它。

我的問題是,一旦連接關閉,我該如何告訴孩子演員停止?上面的接收器不知道哪個小孩最終收到了它的消息,所以我不能發送帶有UUID的ConnectionClosed消息給主管轉發,告訴小孩演員停止自己。發送PoisonPill只會殺死主管。我能想到的最好方法是使用設置孩子的ReceiveTimeout屬性在一段時間內沒有收到事件後自行停止,並讓客戶端發送常規消息。

是否有一種更優雅的方式可以阻止連接關閉時刻的演員?

+0

你是什麼意思,「如果孩子已經存在」?在連接期間,UUID是否可以出現在多條消息中? – chunjef

+0

是的,當連接打開時,客戶端必須定期發送帶有其UUID的消息以保持連接處於活動狀態。客戶端也可以在連接打開時更改開始時間/速率,因此也會在這種情況下傳遞。 – Brideau

+0

因此,當一個特定的連接打開時,主管將看到多個消息中的uuid1和多個消息中的uuid2。一個孩子處理所有'uuid1'消息,另一個孩子處理所有'uuid2'消息。我的理解是否正確? – chunjef

回答

0

Sink.actorRef被實現時,它在內部創建一個actor,這個actor將成爲你的actor的消息發送者。

您可以在發件人ActorRef中爲發件人保留一張地圖,發件人爲InitMessage。當您收到ConnectionClosed(或Failed發送失敗)時,發件人將保持不變,您可以查找正確的孩子並關閉它。

另一種選擇可能是在路線中生成唯一的ID(例如UUID),並提供InitMessage(id)ConnectionClosed(id)

相關問題