2016-05-12 36 views
3

是否可以將Outlet[A]分爲FlowOps[A, _]?也就是說,如果我有這樣的:Akka Stream:Outlet to FlowOps

import akka.NotUsed 
import akka.stream.Outlet 
import akka.stream.scaladsl.{FlowOps, GraphDSL, Source} 

def filter(in: Outlet[Double]) 
      (implicit b: GraphDSL.Builder[NotUsed]): Outlet[Double] = { 
    val in0: FlowOps[Double, NotUsed] = ??? 
    val res = in0.grouped(8).statefulMapConcat[Double] {() => 
    seq => seq.reverse 
    } 
    res 
    ??? : Outlet[Double] 
} 

爲了讓這個grouped工作隨叫隨到,因爲它會如果in要麼一個SourceFlow

回答

1

這只是缺少implicits的進口,然後Outlet可用於流OPS:

def filter(in: Outlet[Double]) 
      (implicit b: GraphDSL.Builder[NotUsed]): Outlet[Double] = { 
    import GraphDSL.Implicits._ 
    import scala.collection.immutable.{Seq => ISeq} 
    val grouped: PortOps[ISeq[Double]] = in.grouped(8) 
    val flattened: PortOps[Double] = grouped.statefulMapConcat[Double] {() => 
     seq => seq.reverse 
    } 
    flattened.outlet 
    }