2017-05-24 140 views
0

我想將文件內容逐行傳送給Actor。我有類似的東西:Akka Java文件IO節流

final ActorSystem system = ActorSystem.create("stream_system"); 
final Materializer materializer = ActorMaterializer.create(system); 
final ActorRef actor = system.actorOf(Props.create(streamActor.class), "sink"); 

final Path file = Paths.get("path/file.txt"); 

Sink<ByteString, CompletionStage<Done>> printlnSink = 
     Sink.<ByteString> foreach(chunk -> actor.tell(chunk.utf8String(), null)); 
     //Sink.<ByteString> actorRef(actor, null); 

CompletionStage<IOResult> ioResult = 
     FileIO.fromPath(file) 
       .throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping()) 
       .to(printlnSink) 
       .run(materializer); 

未註釋版本的作品,但它一次性傳輸整個文件內容。評論版本以「未知」消息結束。

我想逐行發送給Actor,延遲幾秒鐘。任何幫助如何做到這一點?接收演員只需輸入String消息並將其輸出。

回答

2

Framing類可以幫助您與此:

CompletionStage<IOResult> ioResult = 
    FileIO.fromPath(file) 
      .via(Framing.delimiter(ByteString.fromString(System.lineSeparator()), 1000, FramingTruncation.ALLOW)) 
      .throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping()) 
      .to(printlnSink) 
      .run(materializer);