2
我開始學習Akka Stream。我有簡化爲這是一個問題:Akka Stream - 簡單的源/匯示例入口和出口不對應
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
object Test extends App {
val graph = GraphDSL.create() { implicit b =>
val in = Source.fromIterator(() => (1 to 10).iterator.map(_.toDouble))
b.add(in)
val out = Sink.foreach[Double] { d =>
println(s"elem: $d")
}
b.add(out)
in.to(out)
ClosedShape
}
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val rg = RunnableGraph.fromGraph(graph)
rg.run()
}
這將引發一個運行時異常:在線程「主」 java.lang.IllegalArgumentException異常
例外:要求失敗:入口[]和出口[]必須對應於入口[map.in]和插座[StatefulMapConcat.out]
的問題是,在我的實際情況下,我不能使用~>
運營商從GraphDSL.Implicits
,因爲沒有共同的超類型Source
和Flow
(我的圖是從另一個DSL創建的,而不是在一個地方)。所以我只能使用b.add
和in.to(out)
。