2017-05-07 22 views
1

我有一個圖形接受一系列文件,逐個處理它們,然後在執行結束時,如果所有執行成功或失敗,程序應該返回成功(0)或失敗(-1)。
這最後一步如何實現?水槽怎麼知道它何時收到最後一個文件的結果?如何在處理所有元素時發出信號?

val graph = createGraph("path-to-list-of-files") 
val result = graph.run() 

def createGraph(fileOrPath: String): RunnableGraph[NotUsed] = { 
    printStage("PREPARING") { 
    val producer: Source[ProducerFile, NotUsed] = Producer(fileOrPath).toSource() 
    val validator: Flow[ProducerFile, ProducerFile, NotUsed] = Validator().toFlow() 
    val provisioner: Flow[ProducerFile, PrivisionerResult, NotUsed] = Provisioner().toFlow() 
    val executor: Flow[PrivisionerResult, ExecutorResult, NotUsed] = Executor().toFlow() 
    val evaluator: Flow[ExecutorResult, EvaluatorResult, NotUsed] = Evaluator().toFlow() 
    val reporter: Sink[EvaluatorResult, Future[Done]] = Reporter().toSink() 

    val graphResult = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 
    producer ~> validator ~> provisioner ~> executor ~> evaluator ~> reporter 

    ClosedShape 
    }) 
    printLine("The graph pipeline was created") 
    graphResult 
} 

回答

1

reporter水槽已經物化到Future[Done],您可以鉤到,如果你想在所有的元素都處理運行一些代碼。

但是,此刻你並沒有將它暴露在你的圖中。雖然有一種方法使用圖形DSL揭露它,你的情況是,更容易用流利的DSL來實現這一目標:

val graphResult: RunnableGraph[Future[Done]] = producer 
    .via(validator) 
    .via(provisioner) 
    .via(executor) 
    .via(evaluator) 
    .toMat(reporter)(Keep.right) 

這會給你回Future[Done]當您運行圖形

val result: Future[Done] = graph.run() 

然後你可以掛鉤 - 例如

result.onComplete { 
    case Success(_) => println("Success!") 
    case Failure(_) => println("Failure..") 
}