0
我有一個愚蠢的問題,但不可能不知道的原因:交替,以減少發送實時統計數據(的減少)
import akka.{Done, NotUsed}
import akka.actor.Status.Success
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.Future
object Generic {
def main(args: Array[String]) {
implicit val system = ActorSystem("system")
implicit val mat = ActorMaterializer()
val sink: Sink[Any, Future[Done]] = Sink.foreach(x => println("Ans =====> " + x))
val counts = Flow[String]
.mapConcat(x => x.split("\\s").toList)
.filter(!_.isEmpty)
.groupBy(Int.MaxValue, identity)
.map(x => x -> 1)
.reduce((l, r) => (l._1, l._2 + r._2))
.mergeSubstreams
val fold: Flow[String, Int, NotUsed] = Flow[String].map(x => 1).fold(0)(_ + _)
val words: RunnableGraph[ActorRef] = Source.actorRef(Int.MaxValue, OverflowStrategy.fail)
.via(counts)
.to(sink)
val ref = words.run()
for {
ln <- scala.io.Source.stdin.getLines.takeWhile(_ != "-1")
} {
println("---> Message sent " + ln)
ref ! ln
}
ref ! Success("end")
Thread.sleep(5000)
system.terminate()
}
}
它確實很簡單的事情:在應用終端,我輸入的句子。它提取單詞,然後保持每個單詞的頻率。它按預期工作。問題是:
- 來源是一個無限流。即只有當我結束源時,它是否打印輸出。我可以重構程序以始終打印實時統計數據而不是結束數據。我明白了,這是正常現象,由於
reduce
一個跛腳的方式做是有內部reduce
print語句。但是,我還可以做其他事情嗎,例如發送每個句子到另一個接收器的實時統計信息(通過廣播?)