2016-05-03 57 views
0

Akka Http中的Web Socket連接被視爲Akka Streams Flow。這似乎對於基本的請求回覆非常有效,但是當消息也應該通過WebSocket發送時,它會變得更加複雜。我的服務器的核心看起來有點像:異步終止Akka-Http Web Socket連接

lazy val authSuccessMessage = Source.fromFuture(someApiCall) 

lazy val messageFlow = requestResponseFlow 
    .merge(updateBroadcastEventSource) 

lazy val handler = codec 
    .atop(authGate(authSuccessMessage)) 
    .join(messageFlow) 

handleWebSocketMessages { 
    handler 
} 

這裏,codec是(反)序列BidiFlowauthGate是一個BidiFlow其處理認證消息和防止任何消息流出,直到授權成功。一旦成功,它會發送authSuccessMessage作爲回覆。 requestResponseFlow是標準的請求 - 回覆模式,並且updateBroadcastEventSource混合在異步推送消息中。

我希望能夠在某些情況下正常發送錯誤消息並正常終止連接,如錯誤授權,someApiCall失敗或由requestResponseFlow處理的錯誤請求。所以基本上,基本上,我似乎希望能夠異步完成messageFlow以及最後一條消息,儘管其他組成流程仍然存在。

回答

0

找出如何使用KillSwitch來做到這一點。

更新版本

舊版本有,它似乎並沒有在被BidiFlow階段,在棧上層(比如我authGate)觸發到正常工作的問題。我不確定究竟是什麼原因,但將關閉建模爲BidiFlow本身,進一步提高了解決問題的能力。

val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]() 

/** 
* Shutoff valve for the connection. It is triggered when `shutoffPromise` 
* completes, and sends a final optional termination message if that 
* promise resolves with one. 
*/ 
val shutoffBidi = { 
    val terminationMessageSource = Source 
    .maybe[OutgoingWebsocketEvent] 
    .mapMaterializedValue(_.completeWith(shutoffPromise.future)) 

    val terminationMessageBidi = BidiFlow.fromFlows(
    Flow[IncomingWebsocketEventOrAuthorize], 
    Flow[OutgoingWebsocketEvent].merge(terminationMessageSource) 
) 

    val terminator = BidiFlow 
    .fromGraph(KillSwitches.singleBidi[IncomingWebsocketEventOrAuthorize, OutgoingWebsocketEvent]) 
    .mapMaterializedValue { killSwitch => 
     shutoffPromise.future.foreach { _ => println("Shutting down connection"); killSwitch.shutdown() } 
    } 

    terminationMessageBidi.atop(terminator) 
} 

然後我申請它只是codec內:

val handler = codec 
    .atop(shutoffBidi) 
    .atop(authGate(authSuccessMessage)) 
    .join(messageFlow) 

舊版本

val shutoffPromise = Promise[Option[OutgoingWebsocketEvent]]() 

/** 
* Shutoff valve for the flow of outgoing messages. It is triggered when 
* `shutoffPromise` completes, and sends a final optional termination 
* message if that promise resolves with one. 
*/ 
val shutoffFlow = { 
    val terminationMessageSource = Source 
    .maybe[OutgoingWebsocketEvent] 
    .mapMaterializedValue(_.completeWith(shutoffPromise.future)) 

    Flow 
    .fromGraph(KillSwitches.single[OutgoingWebsocketEvent]) 
    .mapMaterializedValue { killSwitch => 
     shutoffPromise.future.foreach(_ => killSwitch.shutdown()) 
    } 
    .merge(terminationMessageSource) 
} 

然後handler樣子:

val handler = codec 
    .atop(authGate(authSuccessMessage)) 
    .join(messageFlow via shutoffFlow) 
+0

如果我可能會問,爲什麼要關閉讓一個懶惰的val? –

+0

@ViktorKlang我有一個'case class'中的整個web套接字處理程序,並且由於初始化順序而導致崩潰,我不想擔心它。 – acjay

+0

當然,但這個例子並不需要。 :) –