2016-05-16 39 views
5

如果我已經創造了阿卡流的RunningGraph,我怎麼能知道(從外部)監視封閉圖形阿卡流

  1. 當所有的節點都由於取消了完成?
  2. 當所有節點由於錯誤而被停止時?

回答

14

我不認爲有一種方法可以爲任意的圖做到這一點,但是如果你有你的圖在控制之下,你只需要將監視接收器附加到每個可能失敗或完成的節點的輸出(這些都是至少有一個輸出節點),例如:

import akka.actor.Status 

// obtain graph parts (this can be done inside the graph building as well) 
val source: Source[Int, NotUsed] = ... 
val flow: Flow[Int, String, NotUsed] = ... 
val sink: Sink[String, NotUsed] = ... 

// create monitoring actors 
val aggregate = actorSystem.actorOf(Props[Aggregate]) 
val sourceMonitorActor = actorSystem.actorOf(Props(new Monitor("source", aggregate))) 
val flowMonitorActor = actorSystem.actorOf(Props(new Monitor("flow", aggregate))) 

// create the graph 
val graph = GraphDSL.create() { implicit b => 
    import GraphDSL._ 

    val sourceMonitor = b.add(Sink.actorRef(sourceMonitorActor, Status.Success(()))), 
    val flowMonitor = b.add(Sink.actorRef(flowMonitorActor, Status.Success(()))) 

    val bc1 = b.add(Broadcast[Int](2)) 
    val bc2 = b.add(Broadcast[String](2)) 

    // main flow 
    source ~> bc1 ~> flow ~> bc2 ~> sink 

    // monitoring branches 
    bc1 ~> sourceMonitor 
    bc2 ~> flowMonitor 

    ClosedShape 
} 

// run the graph 
RunnableGraph.fromGraph(graph).run() 

class Monitor(name: String, aggregate: ActorRef) extends Actor { 
    override def receive: Receive = { 
    case Status.Success(_) => aggregate ! s"$name completed successfully" 
    case Status.Failure(e) => aggregate ! s"$name completed with failure: ${e.getMessage}" 
    case _ => 
    } 
} 

class Aggregate extends Actor { 
    override def receive: Receive = { 
    case s: String => println(s) 
    } 
} 

也可以建立只有一個監測演員和各監測接收器使用它,但在這種情況下,你將無法在失敗的流之間輕鬆區分。

另外還有watchTermination()關於來源和流量的方法,它允許實現與流量一起終止的未來。我想這可能是難以GraphDSL使用,但與常規流的方法可能是這樣的:

import akka.Done 
import akka.actor.Status 
import akka.pattern.pipe 

val monitor = actorSystem.actorOf(Props[Monitor]) 
source 
    .watchTermination()((f, _) => f pipeTo monitor) 
    .via(flow).watchTermination((f, _) => f pipeTo monitor) 
    .to(sink) 
    .run() 

class Monitor extends Actor { 
    override def receive: Receive = { 
    case Done => println("stream completed") 
    case Status.Failure(e) => println(s"stream failed: ${e.getMessage}") 
    } 
} 

你可以它的值通過管道傳遞給演員流之間進行區分之前改變未來。

+0

我喜歡這種方式,謝謝 –

+0

優秀的答案,謝謝 – botkop

+0

@ println中缺少@ vladimir-matveev's'。 '「$ name' - >'s」$ name' – lamusique