2016-08-03 29 views
3

我無法弄清楚如何立即停止阿卡流可運行圖?如何使用killswitch實現此目的?我剛開始AKK溪流已經過了幾天。在我的情況下,我正在讀取文件中的行,並執行一些流操作並寫入接收器。我想要做的是,只要我想要立即停止閱讀文件,我希望這可能會停止整個運行圖。任何想法都將不勝感激。如何突然停止阿卡流可運行圖?

在此先感謝。

回答

2

由於阿卡流2.4.3,有一種優雅通過KillSwitch停止來自外部的流。

請考慮以下示例,該示例在10秒後停止流式傳輸。

object ExampleStopStream extends App { 

    implicit val system = ActorSystem("streams") 
    implicit val materializer = ActorMaterializer() 

    import system.dispatcher 

    val source = Source. 
    fromIterator(() => Iterator.continually(Random.nextInt(100))). 
    delay(500.millis, DelayOverflowStrategy.dropHead) 
    val square = Flow[Int].map(x => x * x) 
    val sink = Sink.foreach(println) 

    val (killSwitch, done) = 
    source.via(square). 
    viaMat(KillSwitches.single)(Keep.right). 
    toMat(sink)(Keep.both).run() 

    system.scheduler.scheduleOnce(10.seconds) { 
    println("Shutting down...") 
    killSwitch.shutdown() 
    } 

    done.foreach { _ => 
    println("I'm done") 
    Await.result(system.terminate(), 1.seconds) 
    } 

} 
+0

請參見下面的示例代碼: 如何制止下圖,並取消讀取大文件? RunnableGraph.fromGraph(GraphDSL.create(){ 隱式助洗劑=> 進口GraphDSL.Implicits._ //源從文件 VAL甲:出口[字符串] = builder.add(readFileLineByLine)的.out //寫入各行到文件 VAL E:入口[字符串] = builder.add(writeLinesToFile)。在 A〜>電子 ClosedShape })運行() – PainPoints

+0

您正在提供例如使得你知道什麼時候在開始之前停止系統,但在我的情況下,我需要隨時停止正在運行的系統。 – PainPoints

+0

我看到,從阿卡流測試我去t這工作: VAL交換機1 = KillSwitches.shared( 「開關」) VAL下游= RunnableGraph.fromGraph(GraphDSL.create(){ 隱式助洗劑=> 進口GraphDSL。 Implicits._ //源從文件 VAL甲:出口[字符串] = builder.add(readFileLineByLine)的.out //寫入各行到文件 VAL E:入口[字符串] = builder.add(writeLinesToFile ).in A.via(switch1.flow)〜> E ClosedShape })。run() switch1.s​​hutdown() – PainPoints

1

單向有一個服務或shutdownhookup其可以調用圖撤銷

val graph= 
    Source.tick(FiniteDuration(0,TimeUnit.SECONDS), FiniteDuration(1,TimeUnit.SECONDS), Random.nextInt).to(Sink.foreach(println)) 
    val cancellable=graph.run() 

    cancellable.cancel 

的cancellable.cancel可以ActorSystem.registerOnTermination的一部分