2016-02-05 20 views
1

根據傳遞的消息的某些屬性,考慮一個簡單的場景,我希望它在特定的下一個階段處理並繼續。如何有條件地將消息傳遞給下一個階段(以及其他階段)?

[Source[ActionMessage]] ~> [Flow[ActionMessage, EnrichedActionMessage]] 
~> (eAM: EnrichedActionMessage => eAM.actionType match { 
     case ActionA => eAM ~> Flow[EnrichedActionMessage, ReactionA] ~> Sink[ReactionA] 
     case ActionB => eAM ~> Flow[EnrichedActionMessage, ReactionB] ~> Sink[ReactionB] 
     case ActionC => eAM ~> Flow[EnrichedActionMessage, ReactionC] ~> Sink[ReactionC] 
    }) 

我該如何實現到階段圖階段的條件路由?

+0

嘗試創建一個使用廣播扇出3個平行叉的流圖。然後,每個叉子都會有一個過濾器步驟,以便在進入該類型的接收器之前過濾掉該叉子的正確類型。 – cmbaxter

回答

1

此答案是基於akka-stream版本2.4.2-RC1。其他版本中的API可能略有不同。依賴關係可以通過sbt消耗:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2-RC1" 

使用Partition組件:

val shape = GraphDSL.create() { implicit b ⇒ 
    import GraphDSL.Implicits._ 

    val first = b.add(Sink.foreach[Int](elem ⇒ println("first:\t" + elem))) 
    val second = b.add(Sink.foreach[Int](elem ⇒ println("second:\t" + elem))) 
    val third = b.add(Sink.foreach[Int](elem ⇒ println("third:\t" + elem))) 
    val p = b.add(Partition[Int](3, elem ⇒ elem match { 
    case 0    ⇒ 0 
    case elem if elem < 0 ⇒ 1 
    case elem if elem > 0 ⇒ 2 
    })) 

    p ~> first 
    p ~> second 
    p ~> third 

    SinkShape(p.in) 
} 
Source(List(0, 1, 2, -1, 1, -5, 0)).to(shape).run() 

/* 
Output: 
first: 0 
third: 1 
third: 2 
second: -1 
third: 1 
second: -5 
first: 0 
*/ 

取而代之的SinkShape你也可以返回new FanOutShape3(p.in, p.out(0), p.out(1), p.out(2))如果你希望做這些單元的任意處理在稍後的時間點。

+0

正在使用分區,正在導致一個問題,它不會將當前處理的消息發送到下游組件,直到從源中將更多消息推入分區。 – phantomastray

+0

@theGhost我不明白你的意思,分區接收到的每個元素都將立即發送到其中一個接收器。 – sschaef

+0

好的。我們發現,stream materializer'autofuse'是默認啓用的;造成一種奇怪的行爲,不傳播有線階段的拉鍊。禁用autofuse解決了它。 分區事實上工作得很好。 – phantomastray

相關問題