我有這種情況,當一個sink
(或中間flow
)實際上可以產生一些副作用數據,必須推回(或附加)到Source
。有沒有一種方法可以使用流式DSL來實現這一點?我可以使用一些阻塞隊列或排序來創建source
,然後將數據直接推送到該隊列,但是這是打破流的抽象。也許有一個我不知道的更好的解決方案?Akka流 - 連接水槽到源?
3
A
回答
2
正如Viktor所說,你可以使用圓形圖。
例如,partition
階段允許您選擇流的特定元素。
def partitionFunction(i: Int): Int = if (i % 2 == 0) 0 else 1
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.foreach[Int](println)
val addOne = Flow[Int].map(_ + 1)
val partition = builder.add(Partition[Int](2, partitionFunction))
val merge = builder.add(Merge[Int](2))
in ~> merge ~> partition
partition.out(0) ~> addOne ~> merge
partition.out(1) ~> out
ClosedShape
})
在這個例子中,源極in
連接到merge
的一個輸入。整數然後通過partition
階段,這將分離偶數和奇數。
偶數正在經歷addOne
流程,然後進入merge
的第二個輸入(這將使它們再次返回到partition
階段)。
奇怪的是直接去水槽out
。
這允許將一些值反饋回圖中,但它很容易導致循環(這就是爲什麼addOne
階段在這裏很重要,沒有它的偶數會被困在圖中)。
1
Reactive-kafka做了類似的事情(至少在0.8版本中):它將Sink所消耗的消息提交給源代碼(Kafka consumer)。
KafkaCommitterSink是執行。儘管這不是一個真正的圓形圖,但據我所知,它更獨立於流的「更新」源。
相關問題
- 1. 將Akka HTTP連接到Akka流
- 2. 源數據到水槽
- 3. 使用Akka Streams的許多水槽的一個來源
- 4. 如何設計卡夫卡連接即水槽以及來源
- 5. 水槽錯誤:連接被拒絕
- 6. 火花流整合水槽
- 7. 編寫自定義水槽NG源/水槽的最佳方式
- 8. 水槽 - 是否有可能只停止水槽來源?
- 9. 水槽水槽星火使用Scala的
- 10. Akka流源 - cassandra結果集
- 11. 實施水槽水槽
- 12. 使用水槽將水槽記錄到水槽所需的格式
- 13. Qt轉發插槽/連接插槽到插槽?
- 14. akka流中的流動問題
- 15. Akka http連接池
- 16. Akka遠程連接
- 17. 如何 「contramap」 阿卡流水槽
- 18. 阿帕奇水槽不與Twitter流
- 19. 連接QAction插槽
- 20. 在AKKA-Streams中使用動態水槽目的地?
- 21. 收集水槽物化值作爲源
- 22. 如何將paintEvent連接到插槽?
- 23. 如何將信號連接到插槽
- 24. 幾個QSliders連接到一個插槽
- 25. 無法連接到家長插槽
- 26. Qt將信號連接到插槽
- 27. 連接到驅動器槽ssh跳
- 28. QAction不會連接到我的插槽
- 29. 如何rowCountChanged連接到插槽
- 30. 將QAction連接到虛擬插槽?
如果「水槽」產生輸出,那麼它不是水槽,而是一個有效的流量。 –
@ViktorKlang好了,我可以有條件地將'Flow'連接到它的'Source',所以在某些情況下,由這個特定的'Flow'發出的事件將會通過Graph的根,就像它由圖的'Source'? – jdevelop
是的,它使用GraphDSL並啓用圓形圖。請記住,循環背壓圖需要一些深思熟慮才能正確使用。 –