2017-01-29 66 views
0

我在Akka流中很新,我一直在使用Rx一段時間,所以我知道所有的操作員都很好,但是我不知道爲什麼我的管道沒有放出值akka流中的流動問題

這裏我的代碼

@Test def mainFlow(): Unit = { 
    val increase = Flow[Int] 
     .map(value => value * 10) 
    val filterFlow = Flow[Int] 
     .filter(value => value > 50) 
     .take(2) 
    Source(0 to 10) 
     .via(increase) 
     .via(filterFlow) 
     .to(Sink.foreach(value => println(s"Item emitted:$value"))) 
     .run() 
    } 

第一流變換10在源倍增發出的值,並只得到高於50項第二流過濾器,然後我只得到2,所以我期待着在60和70的水槽裏,但是沒有任何東西散發出來。

任何想法爲什麼?

回答

3

您的流程已正確構建,併發出您提及的2個元素。 我相信問題出在您的測試上。即,流程異步運行,您的測試是一個普通的程序。因此,測試不會等到流程運行。

您需要在測試中引入一些同步來執行斷言。一種方法是使用ScalaTest的ScalaFutures特徵,它提供了一個futureValue方法。

val increase = Flow[Int] 
    .map(value => value * 10) 
val filterFlow = Flow[Int] 
    .filter(value => value > 50) 
    .take(2) 
Source(0 to 10) 
    .via(increase) 
    .via(filterFlow) 
    .runForeach(value => println(s"Item emitted:$value")) 
    .futureValue 

注意.to(Sink.foreach{...}).run()不公開Future[Done]需要同步的。您的代碼需要更改爲.toMat(Sink.foreach{...})(Keep.right).run(),可縮寫爲.runForeach(...)

+0

謝謝我不知道,一旦你使用流量管道變得異步 – paul

1

因爲什麼你說的是:

對於數字1..10乘以10,但他們永遠只能生產2個第一要素,然後把所有那些大於50,然後打印元件的他們。

此外,您的測試不會等待RunnableFlow的完成,這通常意味着您的程序將在流有機會運行之前退出(Akka Streams異步運行)。

請注意,您的例子有沒有理由使用GraphDSL,你的代碼是相同的:

Source(1 to 10).map(_ * 10).take(2).filter(_ > 50).runForeach(println) 

但由於它是不是真的做任何「有意義異步」我想你一定會覺得很最好用:

(1 to 10).map(_ * 10).take(2).filter(_ > 50).foreach(println) 

但隨後再次,與代碼的當前狀態,它等同於下面的表達式:

() 
+0

我正在學習阿卡流,這就是爲什麼我使用這個愚蠢的例子。關於所有Akka流正在運行異步,我不明白這個背壓的例子如何工作,沒有任何等待https://github.com/politrons/Akka/blob/master/src/main/scala/stream/BackPressure。斯卡拉 – paul