2016-01-22 99 views
2

我正在更新一些實驗性噴霧代碼,以使阿卡流(2.0.2),所以我一點一點地通過他的文檔。我的一個需求是,如果我在流中檢測到協議違規,我需要立即關閉流並啓動客戶端。如何關閉阿卡流?

什麼是立即關閉(終止)流內流的正確方法?

回答

3

使用PushStage

import akka.stream.stage._ 

val closeStage = new PushStage[Tpe, Tpe] { 
    override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match { 
    case elem if shouldCloseStream ⇒ 
     // println("stream closed") 
     ctx.finish() 
    case elem ⇒ 
     ctx.push(elem) 
    } 
} 

你可以通過transform方法與Flow一個PushStage結合:

Flow[Tpe] 
.map(...) 
.transform(() ⇒ closeStage) 
.map(...) 
+1

如果你還在讀這些文字,我剛纔注意到,在2.4.2 PushStage已棄用。你如何在GraphStage中完成這項工作?我試過了completeStage,但那不是。 –

+0

我添加了一個新問題:http://stackoverflow.com/questions/35544856/closing-an-akka-stream-2-4-2 –