2017-01-12 18 views
1

假設我有兩個無限的相同類型的女巫可以連接到一個Graph。我想從外部已經物化的圖形之間切換它們,可能與使用KillSwitch關閉其中一個圖形的方式相同。如何在多個來源之間切換?

val source1: Source[ByteString, NotUsed] = ??? 
val source2: Source[ByteString, NotUsed] = ??? 

val (switcher: Switcher, source: Source[ByteString, NotUsed]) = 
    Source.combine(source1,source2).withSwitcher.run() 

switcher.switch() 

默認情況下,我想用source1和開關我想從source2

source1 
     \ 
      switcher ~> source  

source2 

消耗數據後,是否有可能實現與阿卡流這個邏輯?

回答

2

好的,經過一段時間我找到了解決方案。

所以在這裏我可以使用與我們在VLAN中相同的原理。我只需要標記我的源代碼,然後通過MergeHub傳遞它們。之後,很容易通過標記過濾這些來源併產生正確的結果作爲來源。

所有我需要從一個切換到另一個源是過濾條件的變化。

source1.map(s => (tag1, s)) 
          \ 
          MergeHub.filter(_._1 == tagX).map(_._2) -> Source 
         /
source2.map(s => (tag2, s)) 

下面是一些例子:

object SomeSource { 

    private var current = "tag1" 

    val source1: Source[ByteString, NotUsed] = ??? 
    val source2: Source[ByteString, NotUsed] = ??? 

    def switch = { 
    current = if (current == "tag1") "tag2" else "tag1" 
    } 

    val (sink: Sink[(String, ByteString), NotUsed], 
     source: Source[ByteString, NotUsed]) = 
    MergeHub.source[(String, ByteString)] 
     .filter(_._1 == current) 
     .via(Flow[(String, ByteString)].map(_._2)) 
     .toMat(BroadcastHub.sink[ByteString])(Keep.both).run() 

    source1.map(s => ("tag1", s)).runWith(sink) 
    source2.map(s => ("tag2", s)).runWith(sink) 

} 

SomeSource.source // do something with Source 

SomeSource.switch() // then switch