Akka Http中的Web Socket連接被視爲Akka Streams Flow
。這似乎對於基本的請求回覆非常有效,但是當消息也應該通過WebSocket發送時,它會變得更加複雜。我的服務器的核心看起來有點像:異步終止Akka-Http Web Socket連接
lazy val authSuccessMessage = Source.fromFuture(someApiCall)
lazy val messageFlow = requestResponseFlow
.merge(updateBroadcastEventSource)
lazy val handler = codec
.atop(authGate(authSuccessMessage))
.join(messageFlow)
handleWebSocketMessages {
handler
}
這裏,codec
是(反)序列BidiFlow
和authGate
是一個BidiFlow
其處理認證消息和防止任何消息流出,直到授權成功。一旦成功,它會發送authSuccessMessage
作爲回覆。 requestResponseFlow
是標準的請求 - 回覆模式,並且updateBroadcastEventSource
混合在異步推送消息中。
我希望能夠在某些情況下正常發送錯誤消息並正常終止連接,如錯誤授權,someApiCall
失敗或由requestResponseFlow
處理的錯誤請求。所以基本上,基本上,我似乎希望能夠異步完成messageFlow
以及最後一條消息,儘管其他組成流程仍然存在。
如果我可能會問,爲什麼要關閉讓一個懶惰的val? –
@ViktorKlang我有一個'case class'中的整個web套接字處理程序,並且由於初始化順序而導致崩潰,我不想擔心它。 – acjay
當然,但這個例子並不需要。 :) –