2017-04-11 28 views
2

我有一個SinkShape圖裏面的循環:如何正確終止包含循環的圖形?

source.take(10) ~> merge ~> process ~> split ~> out 
        merge.preferred <~ split   

內部我要確保每個Stream元素循環的數量是有限的。

我兌現這個圖用下面的代碼:

val result: Future[Int] = Source.fromGraph(graph).runWith(Sink.fold(...)) 

現在我試圖獲得該Future的價值,但onComplete回調永遠不會調用。圖中原始的Source應該正確終止,因爲我添加了.take(10)語句。

我還嘗試以消除環路,並且如預期

回答

2

Merge階段(和MergePreferred也一樣)可以熱切或無法完成生成Future。 換句話說,您可以告訴您的舞臺在其輸入完成時任何完成,或當其輸入的全部完成時完成。

在你的情況下,你需要一個急切的完成,作爲2個輸入之一(即標記爲首選的輸入)將永遠不會完成,因爲環回。

Merge階段以布爾型標誌顯示完成行爲,默認爲false(有關更多信息,請參見docs)。嘗試將其更改爲true。

val merge = b.add(MergePreferred(secondaryPorts = 2, eagerComplete = true))