2016-09-04 66 views
3

我有這種情況,當一個sink(或中間flow)實際上可以產生一些副作用數據,必須推回(或附加)到Source。有沒有一種方法可以使用流式DSL來實現這一點?我可以使用一些阻塞隊列或排序來創建source,然後將數據直接推送到該隊列,但是這是打破流的抽象。也許有一個我不知道的更好的解決方案?Akka流 - 連接水槽到源?

+1

如果「水槽」產生輸出,那麼它不是水槽,而是一個有效的流量。 –

+0

@ViktorKlang好了,我可以有條件地將'Flow'連接到它的'Source',所以在某些情況下,由這個特定的'Flow'發出的事件將會通過Graph的根,就像它由圖的'Source'? – jdevelop

+1

是的,它使用GraphDSL並啓用圓形圖。請記住,循環背壓圖需要一些深思熟慮才能正確使用。 –

回答

2

正如Viktor所說,你可以使用圓形圖。

例如,partition階段允許您選擇流的特定元素。

def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1 

    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 
    val in = Source(1 to 10) 
    val out = Sink.foreach[Int](println) 

    val addOne = Flow[Int].map(_ + 1) 

    val partition = builder.add(Partition[Int](2, partitionFunction)) 
    val merge = builder.add(Merge[Int](2)) 

          in ~> merge ~> partition 
    partition.out(0) ~> addOne ~> merge 
    partition.out(1) ~> out 

    ClosedShape 
    }) 

在這個例子中,源極in連接到merge的一個輸入。整數然後通過partition階段,這將分離偶數和奇數。

偶數正在經歷addOne流程,然後進入merge的第二個輸入(這將使它們再次返回到partition階段)。

奇怪的是直接去水槽out

這允許將一些值反饋回圖中,但它很容易導致循環(這就是爲什麼addOne階段在這裏很重要,沒有它的偶數會被困在圖中)。

1

Reactive-kafka做了類似的事情(至少在0.8版本中):它將Sink所消耗的消息提交給源代碼(Kafka consumer)。

KafkaCommitterSink是執行。儘管這不是一個真正的圓形圖,但據我所知,它更獨立於流的「更新」源。