2016-08-24 33 views
15

可以分別使用Source.actorPublisher()Sink.actorSubscriber()方法從演員創建源和匯。但是有可能從演員創建一個Flow從阿卡流中的演員創建​​流

從概念上講,似乎沒有一個很好的理由不考慮它,因爲它實現了ActorPublisherActorSubscriber特徵,但不幸的是,Flow對象沒有任何方法來做到這一點。在this優秀的博客文章中,它是在Akka Streams的早期版本中完成的,所以問題在於最新的(2.4.9)版本中是否可能。

+0

嗯...我會建議試試看,如果它不工作,然後更新喲你的問題。 – hveiga

+0

有沒有辦法做到這一點。也許我不清楚,但快速查看Flow對象的方法顯示沒有這種方法。我的問題是,如果它以另一種形式/ API存在。謝謝 –

回答

28

我是Akka團隊的成員,希望使用此問題來澄清有關原始Reactive Streams接口的一些內容。我希望你會覺得這很有用。

最值得注意的是,我們將在Akka團隊博客上發佈多篇有關構建自定義階段(包括Flows)的多篇文章,敬請關注。

不要使用ActorPublisher/ActorSubscriber

,請不要使用ActorPublisherActorSubscriber。他們的水平太低,最終可能會以違反Reactive Streams specification的方式實施。它們是過去的遺留物,甚至只是「僅限於電力用戶模式」。現在真的沒有理由使用這些類。我們從未提供過構建流程的方法,因爲如果將它暴露爲「原始」Actor API以供您實施並獲得all the rules implemented correctly,則其複雜性僅僅是爆炸性的。

如果你確實想要實現原始的ReactiveStreams接口,那麼請使用Specification's TCK來驗證你的實現是否正確。一些更復雜的角落案例Flow(或RS術語Processor需要處理)可能會引起警惕。

大多數操作都可以建立一個沒有去低級別

很多流量,你應該能夠簡單地通過從Flow[T]建設並添加所需的操作上,只是作爲一個例子建設:

val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt) 

這是對流的可重複使用的描述。

由於您在詢問高級用戶模式,因此這是DSL本身最強大的運營商:statefulFlatMapConcat。絕大多數在普通流元素上操作的操作都可以使用它來表示:Flow.statefulMapConcat[T](f:() ⇒ (Out) ⇒ Iterable[T]): Repr[T]

如果您需要定時器,你可以zipSource.timer

GraphStage是最簡單,最安全的API構建定製階段

相反,建設資源/流/沉沒了自己強大的和安全 API:GraphStage。請閱讀documentation about building custom GraphStages(它們可以是Sink/Source/Flow甚至任意任意形狀)。它爲您處理所有複雜的反應流規則,同時爲您提供充分的自由度和類型安全性,同時實現您的階段(可能是一個流程)。

例如,從文檔所,是一種GraphStage實施filter(T => Boolean)操作者:

class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] { 

    val in = Inlet[A]("Filter.in") 
    val out = Outlet[A]("Filter.out") 

    val shape = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val elem = grab(in) 
      if (p(elem)) push(out, elem) 
      else pull(in) 
     } 
     }) 
     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 
    } 
} 

它還處理異步通道,默認爲可熔合。高度概括

  • ...明天 -

  • +0

    建議_「不使用ActorPublisher和ActorSubscriber ...違反反應流規範」_是否適用於_「從演員創建流?」或者也用於從演員創建源。因爲演員似乎是創建資源的自然方式:http://doc.akka.io/docs/akka/2.4/scala/stream/stages-overview.html#actorPublisher。仍然,想知道是否這個開箱即用的執行是聰明的,不會壓倒演員?請參閱下面的「演員包含緩衝區」。 – SemanticBeeng

    +1

    只需使用GraphStage來實現您的源代碼,它將會更快,更高效;-)鏈接到單一方法Akka Streams必須直接連接這些方法有點讓人誤解,因爲我們有一整頁解釋如何實現自定義階段:http://doc.akka.io/docs/akka/2.4/scala/stream/stream-customize.html正如我已經提到的,堅持GraphStage會帶來很多好處:性能,可熔性,可調試性等。如果你已經有一個出版商,但你可以在Akka上使用它。 –

    12

    ķ onrad的解決方案演示瞭如何創建一個使用Actor的自定義舞臺,但在大多數情況下,我認爲這有點矯枉過正。

    通常你有一些演員是能反應問題:

    val actorQueryFlow : Int => Flow[Input, Output, _] = 
        (parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor) 
    

    val actorRef : ActorRef = ??? 
    
    type Input = ??? 
    type Output = ??? 
    
    val queryActor : Input => Future[Output] = 
        (actorRef ? _) andThen (_.mapTo[Output]) 
    

    這可以通過基本的Flow功能,這需要在併發請求的最大數量很容易地利用現在actorFlow可以集成到任何流...

    +3

    我實際上同意,應該找時間修改我的答案......如果您有時間,請隨時編輯它!這兩種方式應該解釋,你作爲推薦的 –

    +2

    @ Konrad'ktoso'Malawski我很感謝你驗證我的答案。另外,感謝所有關於阿卡的工作。你們正在做一些非常酷的東西。 –