2016-05-12 47 views
2

我開始學習Akka Stream。我有簡化爲這是一個問題:Akka Stream - 簡單的源/匯示例入口和出口不對應

import akka.actor.ActorSystem 
import akka.stream.{ActorMaterializer, ClosedShape} 
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source} 

object Test extends App { 
    val graph = GraphDSL.create() { implicit b => 
    val in = Source.fromIterator(() => (1 to 10).iterator.map(_.toDouble)) 
    b.add(in) 
    val out = Sink.foreach[Double] { d => 
     println(s"elem: $d") 
    } 
    b.add(out) 
    in.to(out) 
    ClosedShape 
    } 

    implicit val system = ActorSystem() 
    implicit val mat = ActorMaterializer() 
    val rg = RunnableGraph.fromGraph(graph) 
    rg.run() 
} 

這將引發一個運行時異常:在線程「主」 java.lang.IllegalArgumentException異常

例外:要求失敗:入口[]和出口[]必須對應於入口[map.in]和插座[StatefulMapConcat.out]


的問題是,在我的實際情況下,我不能使用~>運營商從GraphDSL.Implicits,因爲沒有共同的超類型SourceFlow(我的圖是從另一個DSL創建的,而不是在一個地方)。所以我只能使用b.addin.to(out)

回答

2

似乎人們必須使用一個從builder.add獲得出口的一個特殊的「複製」:

val graph = GraphDSL.create() { implicit b => 
    val in = Source.fromIterator(() => (1 to 10).iterator.map(_.toDouble)) 
    val out = Sink.foreach[Double] { d => 
     println(s"elem: $d") 
    } 
    import GraphDSL.Implicits._ 
    val inOutlet = b.add(in).out 
    // ... pass inOutlet around until ... 
    inOutlet ~> out 
    ClosedShape 
    }