下面的解決方案允許通過終止由Source.actorRef
階段實現的Actor來從服務器端刪除連接。這隻需發送PoisonPill
即可完成。
現在,我還不清楚你想在連接時識別一個「被禁止」的客戶端,所以這個例子 - 故意 - 非常簡單:服務器在最大數量的客戶連接。如果您想要隨時使用任何其他策略來啓動客戶端,則仍然可以應用相同的邏輯並將PoisonPill
發送給其自己的源演員。
object ChatApp extends App {
implicit val system = ActorSystem("chat")
implicit val executor: ExecutionContextExecutor = system.dispatcher
implicit val materializer = ActorMaterializer()
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
val maximumClients = 1
class ChatRef extends Actor {
override def receive: Receive = withClients(Map.empty[UUID, ActorRef])
def withClients(clients: Map[UUID, ActorRef]): Receive = {
case SignedMessage(uuid, msg) => clients.collect{
case (id, ar) if id != uuid => ar ! msg
}
case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
case CloseConnection(uuid) => context.become(withClients(clients - uuid))
}
}
object Protocol {
case class SignedMessage(uuid: UUID, msg: String)
case class OpenConnection(actor: ActorRef, uuid: UUID)
case class CloseConnection(uuid: UUID)
}
val chatRef = system.actorOf(Props[ChatRef])
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(s) => Future.successful(s)
case TextMessage.Streamed(s) => s.runFold("")(_ + _)
case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
}.via(chatActorFlow(UUID.randomUUID()))
.map(TextMessage(_))
def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {
val sink = Flow[String]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source.actorRef(16, OverflowStrategy.fail)
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
Http().bindAndHandle(route, "0.0.0.0", 8080)
.map(_ => println(s"Started server..."))
}
從代碼看來,'chatRef'實際上*會接收一個'ConnectionClosed'消息(作爲'Sink.actorRef'的onComplete消息)。你能否澄清你想達到的目標? –
我想手動關閉服務器端的連接。讓我們假設我有一個「禁止用戶」列表,每當被禁用的用戶打開一個連接時,我想通過服務器關閉它。 –