我開始學習阿卡流和我從這裏運行的第一個例子:阿卡流階段不執行同時
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-rate.html#stream-rate-scala
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
object Main extends App {
implicit val system = ActorSystem("TestSystem")
implicit val mat = ActorMaterializer()
Source(1 to 10)
.map { i => println(s"A: $i"); i }
.map { i => println(s"B: $i"); i }
.map { i => println(s"C: $i"); i }
.runWith(Sink.ignore)
}
基礎上例中,輸出應該是不確定性像這樣:
A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3
但是,當我運行它,它永遠不會開始處理下一個元素,直到前一個完全處理。
A: 1
B: 1
C: 1
A: 2
B: 2
C: 2
A: 3
B: 3
C: 3
A: 4
B: 4
C: 4
A: 5
B: 5
C: 5
A: 6
B: 6
C: 6
A: 7
B: 7
C: 7
A: 8
B: 8
C: 8
A: 9
B: 9
C: 9
A: 10
B: 10
C: 10
我也嘗試添加延遲到每個階段/地圖(帶的Thread.sleep),只有一件事是在一次處理,彷彿演員系統只有一個線程。我能夠確認Akka調度程序有足夠的線程。
import system.dispatcher
val start = System.currentTimeMillis()
(1 to 10).map { i => Future { Thread.sleep(1000); println(s"Finished ${i} after ${System.currentTimeMillis() - start}ms") }
輸出:
Finished 5 after 1004ms
Finished 2 after 1004ms
Finished 6 after 1005ms
Finished 8 after 1004ms
Finished 4 after 1004ms
Finished 9 after 1005ms
Finished 7 after 1004ms
Finished 3 after 1005ms
Finished 1 after 1006ms
Finished 10 after 1009ms
東西是否需要進行調整,以獲得階段可以同時處理?
我使用Akka Streams 2.4.2和Java 1.8.0_65-b17。
謝謝!而已。我想他們還沒有開始爲我正在測試的示例更新文檔,因爲Operator Fusion改變了這種行爲。再次感謝! – voipdaddy
@voipdaddy fyi:更新文檔已經有一個問題:https://github.com/akka/akka/issues/20017。 – Mihai238