2017-04-22 133 views
1

我正在玩Akka Streams和使用Alpakka從文件流式傳輸內容。我需要在一段時間後停止流,所以我想使用KillSwitc h。但我不知道如何使用它,因爲我正在使用圖形DSL。Akka Stream DSL圖KillSwitch

My圖表看起來是這樣的:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 

    source ~> mainFlow ~> sink 

    ClosedShape 
}) 

graph.run() 

我發現這裏的解決方案:How to abruptly stop an akka stream Runnable Graph?

不過,我不知道如何,如果我使用的圖形DSL應用它。你能給我一些建議嗎?

回答

2

要在GraphDSL中顯示物化值,可以將物化爲該值的階段傳遞給create方法。用一個例子更容易解釋。在你的情況:

val switch = KillSwitches.single[Int] 

    val graph: RunnableGraph[UniqueKillSwitch] = 
    RunnableGraph.fromGraph(GraphDSL.create(switch) { implicit builder: GraphDSL.Builder[UniqueKillSwitch] => sw => 
    import GraphDSL.Implicits._ 

    source ~> mainFlow ~> sw ~> sink 

    ClosedShape 
    }) 

    val ks = graph.run() 
    ks.shutdown() 
+0

好吧,完美:)我還有一個問題,你有一個想法如何在進程n元素後停止流?現在我正在使用system.scheduler.scheduleOnce(10.seconds),但更好的是在經過處理的定義的元素數而不是時間之後停止。 – martyn

+0

不需要kill開關,你可以使用'.take(n)' –

+0

我知道,但是我想要n個元素,停止流,在那個呼叫系統之後調用另一個做別的事情的方法。終止()。我想測量時間花費多少時間來處理例如100個元素。 – martyn

相關問題