2016-04-26 46 views
2

我正在嘗試使用Akka Streams的某些東西來爲我的項目之一使用Rx Scala。我很想看看Akka Streams如何取代我們擁有的Rx Scala庫。有一件事我沒有看到Akka Streams可能有一個源和許多接收器。也就是說,在這個例子中取直出從阿卡流文檔:使用Akka Streams的許多水槽的一個來源

val source = Source(1 to 10) 
val sink = Sink.fold[Int, Int](0)(_ + _) 

// connect the Source to the Sink, obtaining a RunnableGraph 
val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right) // how could I materialize to a Seq of Sinks? 

// materialize the flow and get the value of the FoldSink 
val sum: Future[Int] = runnable.run() 

當使用RX類庫,都在我的來源(可觀察),庫(觀察員)完全分離,給了我靈活地映射1個來源(Observable)並且有n個Sinks(觀察者)。我如何通過Akka Streams實現這一目標?任何指針都會有幫助!

回答

2

這是可與Graphs,具體Broadcast

廣播[T] - (1-輸入,N輸出)給定的輸入元件發射到 每個輸出

一些示例代碼從該文件:

val in = Source(1 to 10) 
val out = Sink.ignore 

val bcast = builder.add(Broadcast[Int](2)) 
val merge = builder.add(Merge[Int](2)) 

val f1, f2, f3, f4 = Flow[Int].map(_ + 10) 

in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out 
      bcast ~> f4 ~> merge 
ClosedShape 
相關問題