我是Akka團隊的成員,希望使用此問題來澄清有關原始Reactive Streams接口的一些內容。我希望你會覺得這很有用。
最值得注意的是,我們將在Akka團隊博客上發佈多篇有關構建自定義階段(包括Flows)的多篇文章,敬請關注。
不要使用ActorPublisher/ActorSubscriber
,請不要使用ActorPublisher
和ActorSubscriber
。他們的水平太低,最終可能會以違反Reactive Streams specification的方式實施。它們是過去的遺留物,甚至只是「僅限於電力用戶模式」。現在真的沒有理由使用這些類。我們從未提供過構建流程的方法,因爲如果將它暴露爲「原始」Actor API以供您實施並獲得all the rules implemented correctly,則其複雜性僅僅是爆炸性的。
如果你確實想要實現原始的ReactiveStreams接口,那麼請使用Specification's TCK來驗證你的實現是否正確。一些更復雜的角落案例Flow
(或RS術語Processor
需要處理)可能會引起警惕。
大多數操作都可以建立一個沒有去低級別
很多流量,你應該能夠簡單地通過從Flow[T]
建設並添加所需的操作上,只是作爲一個例子建設:
val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)
這是對流的可重複使用的描述。
由於您在詢問高級用戶模式,因此這是DSL本身最強大的運營商:statefulFlatMapConcat
。絕大多數在普通流元素上操作的操作都可以使用它來表示:Flow.statefulMapConcat[T](f:() ⇒ (Out) ⇒ Iterable[T]): Repr[T]
。
如果您需要定時器,你可以zip
有Source.timer
等
GraphStage是的最簡單,最安全的API構建定製階段
相反,建設資源/流/沉沒了自己強大的和安全 API:GraphStage
。請閱讀documentation about building custom GraphStages(它們可以是Sink/Source/Flow甚至任意任意形狀)。它爲您處理所有複雜的反應流規則,同時爲您提供充分的自由度和類型安全性,同時實現您的階段(可能是一個流程)。
例如,從文檔所,是一種GraphStage實施filter(T => Boolean)
操作者:
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[A]("Filter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
它還處理異步通道,默認爲可熔合。高度概括
...明天 -
嗯...我會建議試試看,如果它不工作,然後更新喲你的問題。 – hveiga
有沒有辦法做到這一點。也許我不清楚,但快速查看Flow對象的方法顯示沒有這種方法。我的問題是,如果它以另一種形式/ API存在。謝謝 –