2017-05-17 41 views
5

鑑於我有一段很長時間的事件流,流經如下所示的事件。當很長一段時間過去後,將會有很多子流被創建而不再需要。如何清理連續阿卡流中的子流

有一種方法來清理一個特定子在給定時間,對於 例如通過ID 3中創建的子應清洗並在13Pm失去了掃描方法的狀態 (到期婦女參與發展的屬性) ?

case class Wid(id: Int, v: String, expires: LocalDateTime) 
test("Substream with scan") { 
    val (pub, sub) = TestSource.probe[Wid] 
    .groupBy(Int.MaxValue, _.id) 
    .scan("")((a: String, b: Wid) => a + b.v) 
    .mergeSubstreams 
    .toMat(TestSink.probe[String])(Keep.both) 
    .run() 
} 

回答

3

TL; DR你可以在一段時間後關閉子。但是,使用輸入來動態設置內置階段的時間是另一回事。

關閉一個子

要關閉的流動時,通常完成它(從上游),但也可以取消它(從下游)。例如,一旦n元素已經通過,take(n: Int)流程將取消。

現在,在groupBy的情況下,您無法完成一個子流,因爲上游流被所有子流共享,但是您可以取消它。如何取決於你想要結束的條件。

但是,要知道,groupBy消除輸入,用於已經被關閉的子流程:如果ID爲3一個新的元素來自上游到groupBy3 -substream關閉後,它只會被忽略,在未來元素將被拉入。其原因可能是在關閉和重新打開子流之間的過程中可能會丟失一些元素。此外,如果您的流應該運行很長時間,這會影響性能,因爲每個元素都將在轉發到相關(實時)子流之前針對已關閉的子流列表進行檢查。如果你不滿意這個表現,你可能想要實現你自己的有狀態過濾器(比如說,使用bloom過濾器)。

要關閉一個子流,我通常使用take(如果您只需要給定數量的元素,但在無限流上可能不是這種情況),或者某種超時:completionTimeout如果您想要固定從實現到關閉的時間或idleTimeout如果您想在沒有元素通過一段時間時關閉。請注意,這些流不取消流,但是失敗了,所以你必須趕上與recoverrecoverWith階段異常改變失敗成取消(recoverWith允許您取消不發送任何最後一個元素,通過與Source.empty恢復) 。

動態設置超時

現在,你想要的是根據第一通過元素動態設置關閉時間。這更復雜,因爲流的實現與通過它們的元素無關。事實上,在通常情況下(沒有groupBy)的情況下,流在任何元素通過它們之前都已經實現,因此使用元素來實現它們是沒有意義的。

我有類似的問題that question,並最終使用的groupBy修改後的版本與簽名

paramGroupBy[K, OO, MM](maxSubstreams: Int, f: Out => K, paramSubflow: K => Flow[Out, OO, MM]) 

,允許使用定義它的關鍵在於確定每個子。這可以修改爲具有第一個元素(而不是關鍵字)作爲參數。

另一種(可能更簡單,就你的情況而言)的方式是編寫自己的舞臺,完全按照你的要求:從第一個元素獲取結束時間並取消當前的流。這裏是一個示例實現(我使用調度器而不是設置狀態):

object CancelAfterTimer 

class CancelAfter[T](getTimeout: T => FiniteDuration) extends GraphStage[FlowShape[T, T]] { 
    val in = Inlet[T]("CancelAfter.in") 
    val out = Outlet[T]("CancelAfter.in") 
    override val shape: FlowShape[T, T] = FlowShape(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { 
    override def onPush(): Unit = { 
     val elem = grab(in) 
     if (!isTimerActive(CancelAfterTimer)) 
     scheduleOnce(CancelAfterTimer, getTimeout(elem)) 
     push(out, elem) 
    } 

    override def onTimer(timerKey: Any): Unit = 
     completeStage() //this will cancel the upstream and close the downstrean 

    override def onPull(): Unit = pull(in) 

    setHandlers(in, out, this) 
    } 
}