我無法弄清楚如何立即停止阿卡流可運行圖?如何使用killswitch實現此目的?我剛開始AKK溪流已經過了幾天。在我的情況下,我正在讀取文件中的行,並執行一些流操作並寫入接收器。我想要做的是,只要我想要立即停止閱讀文件,我希望這可能會停止整個運行圖。任何想法都將不勝感激。如何突然停止阿卡流可運行圖?
在此先感謝。
我無法弄清楚如何立即停止阿卡流可運行圖?如何使用killswitch實現此目的?我剛開始AKK溪流已經過了幾天。在我的情況下,我正在讀取文件中的行,並執行一些流操作並寫入接收器。我想要做的是,只要我想要立即停止閱讀文件,我希望這可能會停止整個運行圖。任何想法都將不勝感激。如何突然停止阿卡流可運行圖?
在此先感謝。
由於阿卡流2.4.3,有一種優雅通過KillSwitch
停止來自外部的流。
請考慮以下示例,該示例在10秒後停止流式傳輸。
object ExampleStopStream extends App {
implicit val system = ActorSystem("streams")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val source = Source.
fromIterator(() => Iterator.continually(Random.nextInt(100))).
delay(500.millis, DelayOverflowStrategy.dropHead)
val square = Flow[Int].map(x => x * x)
val sink = Sink.foreach(println)
val (killSwitch, done) =
source.via(square).
viaMat(KillSwitches.single)(Keep.right).
toMat(sink)(Keep.both).run()
system.scheduler.scheduleOnce(10.seconds) {
println("Shutting down...")
killSwitch.shutdown()
}
done.foreach { _ =>
println("I'm done")
Await.result(system.terminate(), 1.seconds)
}
}
單向有一個服務或shutdownhookup其可以調用圖撤銷
val graph=
Source.tick(FiniteDuration(0,TimeUnit.SECONDS), FiniteDuration(1,TimeUnit.SECONDS), Random.nextInt).to(Sink.foreach(println))
val cancellable=graph.run()
cancellable.cancel
的cancellable.cancel可以ActorSystem.registerOnTermination的一部分
請參見下面的示例代碼: 如何制止下圖,並取消讀取大文件? RunnableGraph.fromGraph(GraphDSL.create(){ 隱式助洗劑=> 進口GraphDSL.Implicits._ //源從文件 VAL甲:出口[字符串] = builder.add(readFileLineByLine)的.out //寫入各行到文件 VAL E:入口[字符串] = builder.add(writeLinesToFile)。在 A〜>電子 ClosedShape })運行() – PainPoints
您正在提供例如使得你知道什麼時候在開始之前停止系統,但在我的情況下,我需要隨時停止正在運行的系統。 – PainPoints
我看到,從阿卡流測試我去t這工作: VAL交換機1 = KillSwitches.shared( 「開關」) VAL下游= RunnableGraph.fromGraph(GraphDSL.create(){ 隱式助洗劑=> 進口GraphDSL。 Implicits._ //源從文件 VAL甲:出口[字符串] = builder.add(readFileLineByLine)的.out //寫入各行到文件 VAL E:入口[字符串] = builder.add(writeLinesToFile ).in A.via(switch1.flow)〜> E ClosedShape })。run() switch1.shutdown() – PainPoints