2017-02-06 83 views
0

我想通過websockets發送通知給客戶端。因此我試圖在服務器啓動時創建一個actor的消息流,並訂閱websockects到這個流的連接(只發送訂閱後發出的通知)如何使用Akka Streams和Akka HTTP將websockets訂閱到actor的消息?

Source.actorRef我們可以創建一個Source演員消息。

val ref = Source.actorRef[Weather](Int.MaxValue, fail) 
       .filter(!_.raining) 
       .to(Sink foreach println) 
       .run() 

ref ! Weather("02139", 32.0, true) 

但我怎麼可以訂閱(阿卡HTTP *)這個源WebSockets的連接,如果已被物化?在阿卡HTTP

* WebSockets的連接需要一個流程[消息,信息,任何]

我試圖做的是一樣的東西

// at server startup 
val notifications: Source[Notification,ActorRef] = Source.actorRef[Notificacion](5,OverflowStrategy.fail) 
val ref = notifications.to(Sink.foreach(println(_))).run() 
val notificationActor = system.actorOf(NotificationActor.props(ref)) 

// on ws connection 
val notificationsWS = path("notificationsWS") { 
    parameter('name) { name ⇒ 
    get { 
     onComplete(flow(name)){ 
     case Success(f) => handleWebSocketMessages(f) 
     case Failure(e) => throw e 
     } 
    } 
    } 
} 

def flow(name: String) = { 
    val messages = notifications filter { n => n.name equals name } map { n => TextMessage.Strict(n.data) } 
    Flow.fromSinkAndSource(Sink.ignore, notifications) 
} 

這doensn't的工作,因爲通知源並不是物化的那個,因此它不會發射任何元素。

注:我是用Source.actorPublisher和它的工作,但ktoso discourages his usage也是我得到這個錯誤:

java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0. 

回答

1

你可以暴露物化actorRef使用mapMaterializedValue一些外部路由器的演員。

Flow.fromSinkAndSourceMat(Sink.ignore, notifications)(Keep.right) 
    .mapMaterializedValue(srcRef => router ! srcRef) 

路由器可以跟蹤你的源代碼actorrefs的(臨終看護可以幫助收拾東西),並轉發消息給他們。

注意:您可能已經注意到了,但請注意,通過使用Source.actorRef來提供您的流量,您的流量將不會反壓(您選擇的策略只會在負載下崩潰)。