我正在Scala編寫應用程序,我正在使用Akka流。Akka流 - 按流中元素的數量進行過濾
在某一點上,我需要過濾掉N個元素少於N個的流。因此,例如,與N=5
:
Source(List(1,2,3)).via(myFilter) // => List()
Source(List(1,2,3,4)).via(myFilter) // => List()
將成爲空流,
Source(List(1,2,3,4,5)).via(myFilter) // => List(1,2,3,4,5)
Source(List(1,2,3,4,5,6)).via(myFilter) // => List(1,2,3,4,5,6)
將保持不變。
當然,我們無法知道流中元素的數量,直到它結束爲止,並且在推送之前等到最後纔可能不是最好的主意。
所以,相反,我已經考慮過下面的算法:
- 用於第一N-1的元素,只是緩衝它們,而無需進一步傳遞;
- 如果輸入流在到達第N個元素之前完成,則輸出空流;
- 如果輸入流到達第N個元素,輸出緩衝的N-1個元素,然後輸出第N個元素,然後傳遞所有後面的元素。
但是,我不知道如何構建一個實現它的Flow
元素。是否有一些可以使用的內置Akka元素?
編輯:
好了,我打了昨天,我想出了這樣的事情:
Flow[Int].
prefixAndTail(N).
flatMapConcat {
case (prefix, tail) if prefix.length == N =>
Source(prefix).concat(tail)
case _ =>
Source.empty[Int]
}
將它做我想做什麼?