2017-10-06 33 views
1

我正在使用Akka websockets將數據推送到某個客戶端。使用Actor將數據發送到Akka websockets

這是我迄今所做的:「你好!」

import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.util.concurrent.CompletionStage; 
import java.util.concurrent.TimeUnit; 

import akka.NotUsed; 
import akka.actor.ActorSystem; 
import akka.http.javadsl.ConnectHttp; 
import akka.http.javadsl.Http; 
import akka.http.javadsl.ServerBinding; 
import akka.http.javadsl.model.HttpRequest; 
import akka.http.javadsl.model.HttpResponse; 
import akka.http.javadsl.model.ws.Message; 
import akka.http.javadsl.model.ws.WebSocket; 
import akka.japi.Function; 
import akka.stream.ActorMaterializer; 
import akka.stream.Materializer; 
import akka.stream.javadsl.Flow; 
import akka.stream.javadsl.Sink; 
import akka.stream.javadsl.Source; 

public class Server { 

    public static HttpResponse handleRequest(HttpRequest request) { 
    System.out.println("Handling request to " + request.getUri()); 
    if (request.getUri().path().equals("/greeter")) { 
     final Flow<Message, Message, NotUsed> greeterFlow = greeterHello(); 
     return WebSocket.handleWebSocketRequestWith(request, greeterFlow); 
    } else { 
     return HttpResponse.create().withStatus(404); 
    } 
    } 

    public static void main(String[] args) throws Exception { 
    ActorSystem system = ActorSystem.create(); 

    try { 
     final Materializer materializer = ActorMaterializer.create(system); 

     final Function<HttpRequest, HttpResponse> handler = request -> handleRequest(request); 
     CompletionStage<ServerBinding> serverBindingFuture = Http.get(system).bindAndHandleSync(handler, 
      ConnectHttp.toHost("localhost", 8080), materializer); 

     // will throw if binding fails 
     serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); 
     System.out.println("Press ENTER to stop."); 
     new BufferedReader(new InputStreamReader(System.in)).readLine(); 
    } finally { 
     system.terminate(); 
    } 
    } 

    public static Flow<Message, Message, NotUsed> greeterHello() { 
    return Flow.fromSinkAndSource(Sink.ignore(), 
     Source.single(new akka.http.scaladsl.model.ws.TextMessage.Strict("Hello!"))); 
    } 
} 

在客戶端,我成功地接收信息。 不過,現在我想(從一個演員最好)動態地發送數據,這樣的事情:

import akka.actor.ActorRef; 
import akka.actor.UntypedActor; 

public class PushActor extends UntypedActor { 
    @Override 
    public void onReceive(Object message) { 
    if (message instanceof String) { 
     String statusChangeMessage = (String) message; 
     // How to push this message to a socket ?? 
    } else { 
     System.out.println(String.format("'%s':\nReceived unknown message '%s'!", selfActorPath, message)); 
    } 
    } 

} 

我無法找到有關這個網上的任何例子。

下面是正在使用的軟件棧:

回答

0

一個 - 不一定很優雅 - 這樣做的方法是使用Source.actorRef併發送物化根據你的要求,某個地方的演員(也許是路由器演員?)。

public static Flow<Message, Message, NotUsed> greeterHello() { 
    return Flow.fromSinkAndSourceMat(Sink.ignore(), 
     Source.actorRef(100, OverflowStrategy.fail()), 
     Keep.right()).mapMaterializedValue(/* send your actorRef to a router? */); 
} 

接到連接客戶端的actorRefs的人必須負責路由消息給他們。