2016-02-07 44 views
0

我是新來的阿卡流,所以想問問如何重現本文http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/scala/stream-rate.html重現阿卡流異步輸出

中提出對於給定的代碼的行爲

Source(1 to 3) 
    .map { i => println(s"A: $i"); i } 
    .map { i => println(s"B: $i"); i } 
    .map { i => println(s"C: $i"); i } 
    .runWith(Sink.ignore) 

得到這樣類似

A: 1 
A: 2 
B: 1 
A: 3 
B: 2 
C: 1 
B: 3 
C: 2 
C: 3 

我試過添加一些隨機Thread.sleep, 從無限迭代器創建一個流。 但是Akka相應的調試輸出總是使用相同的線程進行處理。

所以問題是:如何使用akka-stream重現異步行爲(每個階段應該以異步方式運行)?

+0

你在async下的含義是什麼?你的預期行爲是什麼? – 1esha

+0

東西,exept此 A:1- B:1 C:1 A:2- B:2 C:2 A:3- B:3 C:3 – alatom

+0

然後執行所有在一個地圖步驟? – 1esha

回答

2

您看到順序操作的原因是因爲您的所有操作都脫離了同一個源,因此在同一個異步邊界內。爲了讓「異步行爲」您正在尋找您需要添加Flows

implicit val actorSystem = ActorSystem() 
implicit val actorMaterializer = ActorMaterializer() 

Source(1 to 3).via(Flow[Int].map{i => println(s"A: $i"); i }) 
       .via(Flow[Int].map{i => println(s"B: $i"); i }) 
       .via(Flow[Int].map{i => println(s"C: $i"); i }) 
       .runWith(Sink.ignore) 

每個流將物化到一個單獨的演員。注意:要獲得真正的併發性,ActorSystem正在運行的線程池必須有多個線程。

有一點要記住:ActorSystem的好處是它承擔了對操作的低級別控制的責任,以便開發人員可以專注於「業務邏輯」。這也可能是一個缺點。根據您的ActorSystem配置,JVM配置和硬件配置,操作順序可能仍然是同步的。

+0

沒有關於如何執行的擔保。每個可以在單獨的線程中執行。你也應該記住緩衝 – 1esha

+0

@ 1esha,完全同意。我的建議只是放鬆了會導致順序處理的一個限制。我會根據您的評論進行相應更新。謝謝。 –