2
我正在更新一些實驗性噴霧代碼,以使阿卡流(2.0.2),所以我一點一點地通過他的文檔。我的一個需求是,如果我在流中檢測到協議違規,我需要立即關閉流並啓動客戶端。如何關閉阿卡流?
什麼是立即關閉(終止)流內流的正確方法?
我正在更新一些實驗性噴霧代碼,以使阿卡流(2.0.2),所以我一點一點地通過他的文檔。我的一個需求是,如果我在流中檢測到協議違規,我需要立即關閉流並啓動客戶端。如何關閉阿卡流?
什麼是立即關閉(終止)流內流的正確方法?
使用PushStage
:
import akka.stream.stage._
val closeStage = new PushStage[Tpe, Tpe] {
override def onPush(elem: Tpe, ctx: Context[Tpe]) = elem match {
case elem if shouldCloseStream ⇒
// println("stream closed")
ctx.finish()
case elem ⇒
ctx.push(elem)
}
}
你可以通過transform
方法與Flow
一個PushStage
結合:
Flow[Tpe]
.map(...)
.transform(() ⇒ closeStage)
.map(...)
如果你還在讀這些文字,我剛纔注意到,在2.4.2 PushStage已棄用。你如何在GraphStage中完成這項工作?我試過了completeStage,但那不是。 –
我添加了一個新問題:http://stackoverflow.com/questions/35544856/closing-an-akka-stream-2-4-2 –