2014-12-01 85 views
4

我正在玩Akka Streams,並且已經計算出了大部分的基礎知識,但我不清楚如何獲取Merge的結果以及對它進行進一步的操作(地圖,過濾器,摺疊等)。如何將Akka Streams合併的輸出管道輸送到另一個Flow?

我想修改下面的代碼,以便不用管道合併到接收器,我可以改爲操縱數據。

implicit val materializer = FlowMaterializer() 

val items_a = Source(List(10,20,30,40,50)) 
val items_b = Source(List(60,70,80,90,100)) 
val sink = ForeachSink(println) 

val materialized = FlowGraph { implicit builder => 
    import FlowGraphImplicits._ 
    val merge = Merge[Int]("m1") 
    items_a ~> merge 
    items_b ~> merge ~> sink 
}.run() 

我想我的主要問題是,我無法弄清楚如何使不具有源的流動分量,我無法弄清楚如何做一個合併,而無需使用特殊合併對象和~>語法。

編輯:這個問題,答案是並用阿卡流0.11

回答

6

工作如果你不關心語義的Merge哪裏去要素下游隨機,那麼你可以只是嘗試concatSource,而不是像所以:

items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println) 

這裏的區別是,從a所有項目將下游第一b任何元素之前流動。如果你真的需要的Merge的行爲,那麼你可以考慮類似如下(請記住,你將最終需要一個接收器,但你肯定可以做合併後的附加變換):

val items_a = Source(List(10,20,30,40,50)) 
val items_b = Source(List(60,70,80,90,100)) 

val sink = ForeachSink[Double](println) 
val transform = Flow[Int].map(_ * 2).map(_.toDouble).to(sink) 


val materialized = FlowGraph { implicit builder => 
    import FlowGraphImplicits._ 
    val merge = Merge[Int]("m1") 
    items_a ~> merge 
    items_b ~> merge ~> transform 
}.run 

在這例如,您可以看到,我使用Flow伴侶助手創建Flow,但沒有輸入Source。從那裏我可以將它附加到合併點來獲得我的額外處理。

+0

謝謝,這正是我一直在尋找的。 – 2014-12-02 14:16:32

4

使用Source.combine

val items_a :: items_b :: items_c = List(
     Source(List(10,20,30,40,50)), 
     Source(List(60,70,80,90,100), 
     Source(List(110,120,130,140,1500)) 

Source.combine(items_a, items_b, items_c : _*)(Merge(_)) 
     .map(_+1) 
     .runForeach(println) 
0

或者,如果你需要保存的輸入源的順序(例如items_a必須在items_b和items_b必須items_c之前)可以使用的毗連,而不是合併。

val items_a :: items_b :: items_c = List(
    Source(List(10,20,30,40,50)), 
    Source(List(60,70,80,90,100), 
    Source(List(110,120,130,140,1500)) 
Source.combine(items_a, items_b, items_c : _*)(Concat(_)) 
相關問題