2017-02-23 72 views
1

這是一個非常簡單的,使用GraphDSL API的新手問題。我讀了幾個相關的SO線程和我沒有看到答案:Akka Streams:如何從GraphDSL API獲取物化接收器輸出?

val actorSystem = ActorSystem("QuickStart") 
val executor = actorSystem.dispatcher 
val materializer = ActorMaterializer()(actorSystem) 

val source: Source[Int, NotUsed] = Source(1 to 5) 
val throttledSource = source.throttle(1, 1.second, 1, ThrottleMode.shaping) 
val intDoublerFlow = Flow.fromFunction[Int, Int](i => i * 2) 
val sink = Sink.foreach(println) 

val graphModel = GraphDSL.create() { implicit b => 
    import GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> sink 

    // I presume I want to change this shape to something else 
    // but I can't figure out what it is. 
    ClosedShape 
} 
// TODO: This is RunnableGraph[NotUsed], I want RunnableGraph[Future[Done]] that gives the 
// materialized Future[Done] from the sink. I presume I need to use a GraphDSL SourceShape 
// but I can't get that working. 
val graph = RunnableGraph.fromGraph(graphModel) 

// This works and gives me the materialized sink output using the simpler API. 
// But I want to use the GraphDSL so that I can add branches or junctures. 
val graphThatIWantFromDslAPI = throttledSource.toMat(sink)(Keep.right) 

回答

4

訣竅是通過你想要物化值(在你的情況下,sink)到GraphDSL.create的階段。您作爲第二個參數傳遞的函數也會發生變化,需要在您的圖形中使用Shape輸入參數(下例中爲s)。

val graphModel: Graph[ClosedShape, Future[Done]] = GraphDSL.create(sink) { implicit b => s => 
    import GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> s 

    // ClosedShape is just fine - it is always the shape of a RunnableGraph 
    ClosedShape 
    } 
    val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel) 

更多信息可在docs找到。

+0

我upvoted你的文檔參考;-) –

+1

ahh毆打到終點;)打得好 –

+0

感謝你們倆。當我將接收器添加到GraphDSL.create(sink)調用時,ClosedShape會收到編譯器錯誤。我如何更新? – clay

3
val graphModel = GraphDSL.create(sink) { implicit b: Builder[Future[Done]] => sink => 
    import akka.stream.scaladsl.GraphDSL.Implicits._ 

    throttledSource ~> intDoublerFlow ~> sink 

    ClosedShape 
} 
val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)  
val graphThatIWantFromDslAPI: RunnableGraph[Future[Done]] = throttledSource.toMat(sink)(Keep.right) 

與GraphDSL API的問題是,隱式生成器負荷過重。您需要將接收器包裝在create中,這會將Builder[NotUsed]變成Builder[Future[Done]],並且現在代表builder => sink => shape而不是builder => shape的功能。

+0

謝謝。當我將sink參數添加到'GraphDSL.create'時,'ClosedShape'行會得到一個新的編譯器錯誤。任何想法如何更新? – clay

+0

對不起,在另一個答案上回答:-) –

相關問題