我想通過websockets發送通知給客戶端。因此我試圖在服務器啓動時創建一個actor的消息流,並訂閱websockects到這個流的連接(只發送訂閱後發出的通知)如何使用Akka Streams和Akka HTTP將websockets訂閱到actor的消息?
與Source.actorRef我們可以創建一個Source演員消息。
val ref = Source.actorRef[Weather](Int.MaxValue, fail)
.filter(!_.raining)
.to(Sink foreach println)
.run()
ref ! Weather("02139", 32.0, true)
但我怎麼可以訂閱(阿卡HTTP *)這個源WebSockets的連接,如果已被物化?在阿卡HTTP
* WebSockets的連接需要一個流程[消息,信息,任何]
我試圖做的是一樣的東西
// at server startup
val notifications: Source[Notification,ActorRef] = Source.actorRef[Notificacion](5,OverflowStrategy.fail)
val ref = notifications.to(Sink.foreach(println(_))).run()
val notificationActor = system.actorOf(NotificationActor.props(ref))
// on ws connection
val notificationsWS = path("notificationsWS") {
parameter('name) { name ⇒
get {
onComplete(flow(name)){
case Success(f) => handleWebSocketMessages(f)
case Failure(e) => throw e
}
}
}
}
def flow(name: String) = {
val messages = notifications filter { n => n.name equals name } map { n => TextMessage.Strict(n.data) }
Flow.fromSinkAndSource(Sink.ignore, notifications)
}
這doensn't的工作,因爲通知源並不是物化的那個,因此它不會發射任何元素。
注:我是用Source.actorPublisher和它的工作,但ktoso discourages his usage也是我得到這個錯誤:
java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0.