2016-03-20 68 views
1

我開始學習阿卡流和我從這裏運行的第一個例子:阿卡流階段不執行同時

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-rate.html#stream-rate-scala

import akka.stream.scaladsl._ 
import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 

object Main extends App { 

    implicit val system = ActorSystem("TestSystem") 
    implicit val mat = ActorMaterializer() 

    Source(1 to 10) 
    .map { i => println(s"A: $i"); i } 
    .map { i => println(s"B: $i"); i } 
    .map { i => println(s"C: $i"); i } 
    .runWith(Sink.ignore) 

} 

基礎上例中,輸出應該是不確定性像這樣:

A: 1 
A: 2 
B: 1 
A: 3 
B: 2 
C: 1 
B: 3 
C: 2 
C: 3 

但是,當我運行它,它永遠不會開始處理下一個元素,直到前一個完全處理。

A: 1 
B: 1 
C: 1 
A: 2 
B: 2 
C: 2 
A: 3 
B: 3 
C: 3 
A: 4 
B: 4 
C: 4 
A: 5 
B: 5 
C: 5 
A: 6 
B: 6 
C: 6 
A: 7 
B: 7 
C: 7 
A: 8 
B: 8 
C: 8 
A: 9 
B: 9 
C: 9 
A: 10 
B: 10 
C: 10 

我也嘗試添加延遲到每個階段/地圖(帶的Thread.sleep),只有一件事是在一次處理,彷彿演員系統只有一個線程。我能夠確認Akka調度程序有足夠的線程。

import system.dispatcher 
    val start = System.currentTimeMillis() 

    (1 to 10).map { i => Future { Thread.sleep(1000); println(s"Finished ${i} after ${System.currentTimeMillis() - start}ms") } 

輸出:

Finished 5 after 1004ms 
Finished 2 after 1004ms 
Finished 6 after 1005ms 
Finished 8 after 1004ms 
Finished 4 after 1004ms 
Finished 9 after 1005ms 
Finished 7 after 1004ms 
Finished 3 after 1005ms 
Finished 1 after 1006ms 
Finished 10 after 1009ms 

東西是否需要進行調整,以獲得階段可以同時處理?

我使用Akka Streams 2.4.2和Java 1.8.0_65-b17。

回答

4

我認爲,你正在觀察Operator Fusion,這意味着所有三個map操作正在同一個Actor上執行。

的可熔元件例如:

  • 所有GraphStages(包括所有內置結除了GROUPBY)
  • 所有階段(包括所有內置線性算子)
  • TCP連接

您可以通過傳遞禁用它的配置參數akka.stream.materializer.auto-fusing=off來禁用它(我不確定它通常是個好主意)o您可以通過添加異步邊界在代碼中禁用它(請參閱我附加的鏈接以獲取更多詳細信息)。單個異步邊界中的所有內容都會在單個actor上執行。

Source(1 to 10) 
    .map { i => println(s"A: $i"); i }.async 
    .map { i => println(s"B: $i"); i }.async 
    .map { i => println(s"C: $i"); i }.async 
    .runWith(Sink.ignore) 
+1

謝謝!而已。我想他們還沒有開始爲我正在測試的示例更新文檔,因爲Operator Fusion改變了這種行爲。再次感謝! – voipdaddy

+0

@voipdaddy fyi:更新文檔已經有一個問題:https://github.com/akka/akka/issues/20017。 – Mihai238