2017-07-06 27 views
0

我想轉換我的源代碼Flowable的方式是,只有事件在特定時段內成爲第一個項目纔會發生。RxJava:只有在特定時間段內的第一個項目發出

也就是說,我想要第一個項目經過,然後刪除所有後續項目,直到有一段時間,比如10秒,其中沒有上游事件到達。

請注意,這是既不

  • debounce:當且僅當它不是接着又一個10秒這將發出的每個項目 - 但這將迫使即使是第一項有10秒的延遲。我想立即發出第一個項目。
  • throttleFirst:這會發出第一個項目,然後在第一個項目之後放置所有後續項目10秒鐘。我希望在每個上游項目之後重置阻止時段。

我現在已經解決了它這樣的:

source 
    .flatMap { Flowable.just(1).concatWith(Flowable.just(-1).delay(10, TimeUnit.SECONDS)) } 
    .scan(0, { x, y -> x + y }) 
    .map { it > 0 } 
    .distinctUntilChanged() 
    .filter { it } 

注:我不關心source實際的項目,只是他們發生 - 但當然,我可能只是包裝Pair中的項目以及1-1)。

有沒有更簡單的方法來使用內置的RxJava(2)運算符來實現相同的目標?

+0

聽起來像前一個問題:https://stackoverflow.com/questions/41964731/immediate-debounce-in-rx有針對HTTPS的擴展操作: //github.com/akarnokd/RxJava2Extensions#flowabletransformersdebouncefirst – akarnokd

回答

1

有可能通過這一事實來switchMap只能預訂一個Flowable一次,並使用一個布爾值,以檢查是否有發出:

class ReduceThrottle<T>(val period: Long, val unit: TimeUnit) : FlowableTransformer<T, T> { 
    override fun apply(upstream: Flowable<T>): Publisher<T> { 
     return Flowable.defer { 
      val doEmit = AtomicBoolean(true) 

      upstream.switchMap { item -> 
       val ret = if (doEmit.compareAndSet(true, false)) { 
        // We haven't emitted in the last 10 seconds, do the emission 
        Flowable.just(item) 
       } else { 
        Flowable.empty() 
       } 

       ret.concatWith(Completable.timer(period, unit).andThen(Completable.fromAction { 
        // Once the timer successfully expires, reset the state 
        doEmit.set(true) 
       }).toFlowable()) 
      } 
     } 
    } 
} 

然後,它僅僅是將變壓器的問題: source.compose(ReduceThrottle(10, TimeUnit.SECONDS))

+0

啊,那太酷了!非常感謝! –

-1

這可能會做你的需要

source.debounce(item -> Observable.timer(10,TimeUnit.SECONDS)) 
相關問題