2016-05-25 26 views
1

如何獲得連接InletOutlet的實例FlowShape?考慮下面的例子如何在FlowShape中獲得連接的入口和出口實例?

def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val ticker = Source.tick(rate, rate, Unit) 

    val zip = builder.add(Zip[T, Unit.type]) 
    val map = Flow[(T, Unit.type)].map { case (value, _) => value } 
    val messageExtractor = builder.add(map) 

    val in = Inlet[T]("Req.in") 
    val out = Outlet[T]("Req.out") 

    out ~> zip.in0 
    ticker ~> zip.in1 
    zip.out ~> messageExtractor.in 

    FlowShape.of(in, messageExtractor.out) 
}) 

,當我在Source.via()使用它我得到例外

Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph. 
    at scala.Predef$.require(Predef.scala:219) 
    at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204) 

我缺少的是以下?

回答

3

in入口和out出口沒有連接到任何東西。這就是爲什麼有一個例外(遺憾的是此類問題只能在運行時檢測)

你想要一個流,其中只開放入口拉鍊入口中的一個(zip.in0,因爲zip.in1連接到股票代碼) ,唯一開放的插座是messageExtractor的輸出,那麼怎麼樣:

def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val ticker = Source.tick(rate, rate,()) 

    val zip = builder.add(Zip[T, Unit]) 
    val map = Flow[(T, Unit)].map { case (value, _) => value } 
    val messageExtractor = builder.add(map) 

    ticker ~> zip.in1 
    zip.out ~> messageExtractor.in 

    FlowShape.of(zip.in0, messageExtractor.out) 
}) 
+0

啊我看到了。所以返回的FlowShape定義了連接。非常感謝! – expert

+1

僅供參考,它按照預期與Akka 2.4.9編譯和工作。謝謝。 –