2016-04-30 21 views
5

我的應用程序有一個Akka-Websocket接口。 Web套接字由一個actor-subscriber和一個actor actor組成。用戶通過將命令發送給相應的演員來處理命令。發佈者在事件流上進行偵聽並將更新信息發佈迴流(最終發佈到客戶端)。這很好。如何通過網絡套接字連接將訂閱者的反應流中的消息發送給發佈者

我的問題:訂閱者如何將事件發送迴流?例如確認接收到的命令的執行。

public class WebSocketApp extends HttpApp { 

    private static final Gson gson = new Gson(); 

    @Override 
    public Route createRoute() { 
    return get(
     path("metrics").route(handleWebSocketMessages(metrics())) 
     ); 
    } 

    private Flow<Message, Message, ?> metrics() { 
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props()); 
    Source<Message, ActorRef> metricsSource = 
     Source.actorPublisher(WebSocketDataPublisherActor.props()) 
     .map((measurementData) -> TextMessage.create(gson.toJson(measurementData))); 
    return Flow.fromSinkAndSource(metricsSink, metricsSource); 
    } 
} 

一個很好的解決辦法是,將訂閱的角色(WebSocketCommandSubscriber演員在上面的代碼)可以發送回一個消息像sender().tell(...)流...

回答

4

不,這是不可能的,至少不是直接的。流始終是單向的 - 所有消息都在一個方向上流動,而對它們的需求則以相反的方向流動。您需要將確認消息從接收器傳遞給源代碼,以便後者將其發回給客戶端,例如,通過在源宿主中註冊源主角。它看起來是這樣的:

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> { 
    sinkActor.tell(new RegisterSource(sourceActor), null); 
}) 

然後,您的水槽演員收到RegisterSource消息後,就可以發送確認郵件所提供的ActorRef,然後將它們轉發到輸出流。

相關問題