1
我有一個上游塊以塊形式發出數據。這個流應該使用throttleFirst進行節制。另外,在所有油門計時器完成後,最後的值應該被髮出。不幸的是,在RxJava 2沒有throttleFierstBuffered運營商,因此我已經實現了ObservableTransformer:RxJava使用autoConnect處理ConnectableObservable
upstream -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2);
return Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged();
}
它運作良好,除了處置。在得到的Observable被處置後,我想要處理上游。我怎樣才能做到這一點?
我試圖訪問一次性使用autoConnect(2,一次性 - > {}),但必須有更好的方法。到目前爲止,我得到了這一點,我不喜歡它:
Observable.<T>create(
emitter -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2, emitter::setDisposable);
Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged()
.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});
'AUTOCONNECT()'不提供'autoDisconnect'。如果您想要在沒有用戶的情況下從上游斷開連接。使用'refcount()'。代替。 –
嗨!我不能在沒有重播的情況下使用refcount,因爲在第一個訂閱者訂閱之後,上游可以發出一個事件,並且它不會被轉發到尚未訂閱的第二個「分支」。我可能不得不使用subscribeOn,但它似乎有點太多。 –
它是'autoConnect(2,一次性 - > {})'。 – akarnokd