2017-07-19 44 views
0

我有一個愚蠢的問題,但不可能不知道的原因:交替,以減少發送實時統計數據(的減少)

import akka.{Done, NotUsed} 
import akka.actor.Status.Success 
import akka.actor.{ActorRef, ActorSystem} 
import akka.stream.scaladsl.{Flow, RunnableGraph, Sink, Source} 
import akka.stream.{ActorMaterializer, OverflowStrategy} 

import scala.concurrent.Future 


object Generic { 
    def main(args: Array[String]) { 

    implicit val system = ActorSystem("system") 
    implicit val mat = ActorMaterializer() 

    val sink: Sink[Any, Future[Done]] = Sink.foreach(x => println("Ans =====> " + x)) 

    val counts = Flow[String] 
     .mapConcat(x => x.split("\\s").toList) 
     .filter(!_.isEmpty) 
     .groupBy(Int.MaxValue, identity) 
     .map(x => x -> 1) 
     .reduce((l, r) => (l._1, l._2 + r._2)) 
     .mergeSubstreams 

    val fold: Flow[String, Int, NotUsed] = Flow[String].map(x => 1).fold(0)(_ + _) 

    val words: RunnableGraph[ActorRef] = Source.actorRef(Int.MaxValue, OverflowStrategy.fail) 
     .via(counts) 
     .to(sink) 

    val ref = words.run() 

    for { 
     ln <- scala.io.Source.stdin.getLines.takeWhile(_ != "-1") 
    } { 
     println("---> Message sent " + ln) 
     ref ! ln 
    } 
    ref ! Success("end") 
    Thread.sleep(5000) 
    system.terminate() 
    } 
} 

它確實很簡單的事情:在應用終端,我輸入的句子。它提取單詞,然後保持每個單詞的頻率。它按預期工作。問題是:

  • 來源是一個無限流。即只有當我結束源時,它是否打印輸出。我可以重構程序以始終打印實時統計數據而不是結束數據。我明白了,這是正常現象,由於reduce

一個跛腳的方式做是有內部reduce print語句。但是,我還可以做其他事情嗎,例如發送每個句子到另一個接收器的實時統計信息(通過廣播?)

回答

1

看看scan組合子。它會給你fold/reduce的聚合力,但它會發出中間結果。

// .reduce((l, r) => (l._1, l._2 + r._2)) 
     .scan("" → 0)((l, r) => (l._1, l._2 + r._2)) 

另外,如果你想輸出發送到一個日誌Sink,你可以看看alsoTo,這將有效地執行廣播到選擇的側Sink