0
我玩過Akka-Streams,我試圖通過執行我自己的PushPullStage
來自定義Flow
。我希望Flow
將它從上游接收到的對象累積到列表中,並在上游完成時向下遊發送組之前,根據某些功能對它們進行分組。從PushPullStage發出多個對象
這似乎是一個非常簡單的事情來實現,但我無法弄清楚如何做到這一點!似乎沒有辦法從PushPullStage
發出多個對象。
這是到目前爲止我的執行:
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
for(group <- groups)
ctx.push(group) // this doesn't work
ctx.finish()
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
編輯
我改變了代碼佔brackpressure和它現在所有工作。基本上我只是需要讓下游Flow
的做他們意味着什麼,並保持牽引要素:
class Accumulate[A] extends PushPullStage[A, List[A]] {
private var groups: List[List[A]] = Nil
private def group(x: A): List[List[A]] = ...
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
groups = group(elem)
ctx.pull()
}
override def onPull(ctx: Context[A]): SyncDirective =
if (ctx.isFinishing) {
groups match {
case Nil => ctx.finish()
case head :: tail =>
groups = tail
ctx.push(head)
}
} else {
ctx.pull()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
}
啊,我沒有想到背壓。我稍微更改了代碼,現在按預期工作(請參閱我的編輯)。謝謝 :) – Oli