TL; DR你可以在一段時間後關閉子。但是,使用輸入來動態設置內置階段的時間是另一回事。
關閉一個子
要關閉的流動時,通常完成它(從上游),但也可以取消它(從下游)。例如,一旦n
元素已經通過,take(n: Int)
流程將取消。
現在,在groupBy
的情況下,您無法完成一個子流,因爲上游流被所有子流共享,但是您可以取消它。如何取決於你想要結束的條件。
但是,要知道,groupBy
消除輸入,用於已經被關閉的子流程:如果ID爲3
一個新的元素來自上游到groupBy
的3
-substream關閉後,它只會被忽略,在未來元素將被拉入。其原因可能是在關閉和重新打開子流之間的過程中可能會丟失一些元素。此外,如果您的流應該運行很長時間,這會影響性能,因爲每個元素都將在轉發到相關(實時)子流之前針對已關閉的子流列表進行檢查。如果你不滿意這個表現,你可能想要實現你自己的有狀態過濾器(比如說,使用bloom過濾器)。
要關閉一個子流,我通常使用take
(如果您只需要給定數量的元素,但在無限流上可能不是這種情況),或者某種超時:completionTimeout
如果您想要固定從實現到關閉的時間或idleTimeout
如果您想在沒有元素通過一段時間時關閉。請注意,這些流不取消流,但是失敗了,所以你必須趕上與recover
或recoverWith
階段異常改變失敗成取消(recoverWith
允許您取消不發送任何最後一個元素,通過與Source.empty
恢復) 。
動態設置超時
現在,你想要的是根據第一通過元素動態設置關閉時間。這更復雜,因爲流的實現與通過它們的元素無關。事實上,在通常情況下(沒有groupBy
)的情況下,流在任何元素通過它們之前都已經實現,因此使用元素來實現它們是沒有意義的。
我有類似的問題that question,並最終使用的groupBy
修改後的版本與簽名
paramGroupBy[K, OO, MM](maxSubstreams: Int, f: Out => K, paramSubflow: K => Flow[Out, OO, MM])
,允許使用定義它的關鍵在於確定每個子。這可以修改爲具有第一個元素(而不是關鍵字)作爲參數。
另一種(可能更簡單,就你的情況而言)的方式是編寫自己的舞臺,完全按照你的要求:從第一個元素獲取結束時間並取消當前的流。這裏是一個示例實現(我使用調度器而不是設置狀態):
object CancelAfterTimer
class CancelAfter[T](getTimeout: T => FiniteDuration) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("CancelAfter.in")
val out = Outlet[T]("CancelAfter.in")
override val shape: FlowShape[T, T] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (!isTimerActive(CancelAfterTimer))
scheduleOnce(CancelAfterTimer, getTimeout(elem))
push(out, elem)
}
override def onTimer(timerKey: Any): Unit =
completeStage() //this will cancel the upstream and close the downstrean
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}