2016-12-16 26 views
0

documentation to implement a KillSwitch之後,我能夠編寫這個簡單的示例來停止Source發出無限數字。Akka Streams:KillSwitch自定義SourceShape從視頻文件中發射幀

object KillSwitchSample extends App { 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource 
    val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph) 

    val killSwitch = KillSwitches.shared("switch") 

    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 
    val flow = builder.add(Flow[Int].map(_ * 2)) 
    mySource.via(killSwitch.flow) ~> flow ~> Sink.foreach(println) 
    ClosedShape 
    }).run() 

    Thread.sleep(200) 

    killSwitch.shutdown() 
} 

class NumbersSource extends GraphStage[SourceShape[Int]] { 
    val out: Outlet[Int] = Outlet("NumbersSource") 
    override val shape: SourceShape[Int] = SourceShape(out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     private var counter = 1 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      push(out, counter) 
      counter += 1 
     } 
     }) 
    } 
} 

我的使用情況是,該源發射使用OpenCV的視頻文件幀的意義不同。爲什麼上游沒有取消?我在這裏錯過了什麼?

object KillSwitchMinimalMain extends App { 
    val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java")) 
    System.load(libopencv_java(0)) 

    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    val videoFile = Video("Video.MOV") 

    val sourceGraph: Graph[SourceShape[Frame], NotUsed] = new VideoSource(videoFile) 
    val videoSource: Source[Frame, NotUsed] = Source.fromGraph(sourceGraph) 

    val killSwitch = KillSwitches.shared("switch") 

    RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val matConversion: FlowShape[Frame, Image] = builder.add(Flow[Frame].map { el => MediaConversion.convertMatToImage(el.frame) }) 

    videoSource.via(killSwitch.flow) ~> matConversion ~> Sink.foreach(println) 

    ClosedShape 
    }).run() 

    Thread.sleep(200) 

    killSwitch.shutdown() 
} 

class VideoSource(videoFile: Video) extends GraphStage[SourceShape[Frame]] { 
    val out: Outlet[Frame] = Outlet("VideoSource") 
    override val shape: SourceShape[Frame] = SourceShape(out) 
    val log: Logger = LoggerFactory.getLogger(getClass) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     private val capture = new VideoCapture() 
     private val frame = new Mat() 
     private var videoPos: Double = _ 

     override def preStart(): Unit = { 
     capture.open(videoFile.filepath) 
     readFrame() 
     } 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      push(out, Frame(videoPos, frame)) 
      readFrame() 
     } 
     }) 

     private def readFrame(): Unit = { 
     if (capture.isOpened) { 
      videoPos = capture.get(1) 
      log.info(s"reading frame $videoPos") 
      capture.read(frame) 
     } 
     } 
    } 
} 

控制檯輸出要求通過@svezfaz:

13:17:00.046 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 0.0 
13:17:00.160 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 1.0 
[email protected] 
13:17:00.698 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 2.0 
[email protected] 
13:17:00.826 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 3.0 
[email protected] 
13:17:00.969 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 4.0 
[email protected] 
13:17:01.137 [default-akka.actor.default-dispatcher-3] INFO de.itd.video.VideoSource - reading frame 5.0 
[email protected] 
// and so on .. 
+0

您能詳細介紹一下您在運行OpenCV示例時看到的輸出內容嗎?你知道一幀拉多少時間? –

+0

我用一個記錄器更新了這個問題,表明幀的讀取速度相當快(當然比發射整數慢得多)。 'javafx.scene.image.WritableImage @ xxxxxxxx'是接收器的'println'。 – Toaditoad

回答

1

的問題是,你介紹你的自定義階段攔截。我不知道OpenCV API,但我猜測它發生在您撥打capture.read(frame)時。 現在,除非另有指示,否則您的圖表將在單個演員中運行,因此您舞臺上的阻擋將阻止整個演員。

強制async邊界後,你的來源應該做的伎倆。

另請注意,在這裏您不需要GraphDSL,所有內容都可以使用via/to DSL緊湊地運行。下面

object KillSwitchMinimalMain extends App { 
    val libopencv_java = new File("lib").listFiles().map(_.getAbsolutePath).filter(_.contains("libopencv_java")) 

    System.load(libopencv_java(0)) 
    implicit val actorSystem = ActorSystem() 
    implicit val materializer = ActorMaterializer() 

    val videoFile = Video("Video.MOV") 

    val killSwitch = KillSwitches.shared("switch") 
    val matConversion = Flow[ByteString].map { _.utf8String } 

    Source.fromGraph(new VideoSource()) 
    .async 
    .via(killSwitch.flow) 
    .via(matConversion) 
    .runForeach(println) 

    Thread.sleep(200) 

    killSwitch.shutdown() 
} 

解決方案的嘗試有關併發模型底層的阿卡流你可以看到這篇blogpost更多信息。

+0

非常感謝您的解釋。我記得在[文檔](http://doc.akka.io/docs/akka/2.4/scala/stream/stream-flows-and-basics.html#Operator_Fusion)中閱讀了這些內容,但顯然不能真正應用它。 關於GraphDSL,我知道這裏沒有必要,但發佈的代碼是一個簡化版本,省略了我的廣播和合並階段。但是自從你提到它之外:除了它更詳細之外,是否還有使用GraphDSL的其他缺點? – Toaditoad

+0

另一個缺點值得一提的是,GraphDSL稍微更脆弱,並且在入口和出口未正確連接的任何地方都會出現運行時錯誤。我知道的性能方面沒有太大差異。 –

+0

謝謝。這就說得通了。 – Toaditoad