2015-09-19 24 views
0

我玩過Akka-Streams,我試圖通過執行我自己的PushPullStage來自定義Flow。我希望Flow將它從上游接收到的對象累積到列表中,並在上游完成時向下遊發送組之前,根據某些功能對它們進行分組。從PushPullStage發出多個對象

這似乎是一個非常簡單的事情來實現,但我無法弄清楚如何做到這一點!似乎沒有辦法從PushPullStage發出多個對象。

這是到目前爲止我的執行:

class Accumulate[A] extends PushPullStage[A, List[A]] { 
    private var groups: List[List[A]] = Nil 

    private def group(x: A): List[List[A]] = ... 

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = { 
     groups = group(elem) 
     ctx.pull() 
    } 

    override def onPull(ctx: Context[A]): SyncDirective = 
     if (ctx.isFinishing) { 
     for(group <- groups) 
      ctx.push(group) // this doesn't work 

     ctx.finish() 
     } else { 
     ctx.pull() 
     } 

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective = 
     ctx.absorbTermination() 
    } 
} 

編輯

我改變了代碼佔brackpressure和它現在所有工作。基本上我只是需要讓下游Flow的做他們意味着什麼,並保持牽引要素:

class Accumulate[A] extends PushPullStage[A, List[A]] { 
    private var groups: List[List[A]] = Nil 

    private def group(x: A): List[List[A]] = ... 

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = { 
     groups = group(elem) 
     ctx.pull() 
    } 

    override def onPull(ctx: Context[A]): SyncDirective = 
     if (ctx.isFinishing) { 
     groups match { 
      case Nil => ctx.finish() 

      case head :: tail => 
      groups = tail 
      ctx.push(head) 
     } 
     } else { 
     ctx.pull() 
     } 

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective = 
     ctx.absorbTermination() 
    } 
} 

回答

2

你不能推超過了要求,因爲這將違反反壓。 另外,值得注意的是,我不會推薦你正在嘗試做什麼,因爲這將爆炸與OutOfMemoryError大或無界流。

class Accumulate[A] extends PushPullStage[A, List[A]] { 
    private var groups: List[List[A]] = Nil 

    private def group(x: A): List[List[A]] = ... 

    override def onPush(elem: A, ctx: Context[A]): SyncDirective = { 
     groups = group(elem) 
     ctx.pull() 
    } 

    override def onPull(ctx: Context[A]): SyncDirective = 
     if (ctx.isFinishing) { 
     groups match { 
      case Nil => ctx.finish() 
      case group :: rest => 
      groups = rest 
      ctx.push(group) 
     } 
     } else { 
     ctx.pull() 
     } 

    override def onUpstreamFinish(ctx: Context[A]): TerminationDirective = 
     ctx.absorbTermination() 
    } 
} 
+0

啊,我沒有想到背壓。我稍微更改了代碼,現在按預期工作(請參閱我的編輯)。謝謝 :) – Oli