2017-05-24 68 views
1

我有一個上游塊以塊形式發出數據。這個流應該使用throttleFirst進行節制。另外,在所有油門計時器完成後,最後的值應該被髮出。不幸的是,在RxJava 2沒有throttleFierstBuffered運營商,因此我已經實現了ObservableTransformer:RxJava使用autoConnect處理ConnectableObservable

upstream -> { 
     Observable<T> autoConnectingUpstream = 
      upstream // 
       .publish() 
       .autoConnect(2); 

     return Observable.merge(
       autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler), 
       autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler)) 
      //if debounce and throttle emit the same item 
      .distinctUntilChanged(); 
    } 

它運作良好,除了處置。在得到的Observable被處置後,我想要處理上游。我怎樣才能做到這一點?

我試圖訪問一次性使用autoConnect(2,一次性 - > {}),但必須有更好的方法。到目前爲止,我得到了這一點,我不喜歡它:

Observable.<T>create(
      emitter -> { 
       Observable<T> autoConnectingUpstream = 
        upstream // 
         .publish() 
         .autoConnect(2, emitter::setDisposable); 

       Observable.merge(
         autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler), 
         autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler)) 
        //if debounce and throttle emit the same item 
        .distinctUntilChanged() 
        .subscribe(emitter::onNext, emitter::onError, emitter::onComplete); 
      }); 
+0

'AUTOCONNECT()'不提供'autoDisconnect'。如果您想要在沒有用戶的情況下從上游斷開連接。使用'refcount()'。代替。 –

+0

嗨!我不能在沒有重播的情況下使用refcount,因爲在第一個訂閱者訂閱之後,上游可以發出一個事件,並且它不會被轉發到尚未訂閱的第二個「分支」。我可能不得不使用subscribeOn,但它似乎有點太多。 –

+0

它是'autoConnect(2,一次性 - > {})'。 – akarnokd

回答

1

我在這裏回答我的問題給我最好的知識,所以請讓我知道,如果我錯了。

基於來自akarnokd的評論,一個解決方案將是這樣的:

Observable.<T>create(
     emitter -> { 
      Observable<T> autoConnectingUpstream = 
       upstream // 
        .publish() 
        .autoConnect(2, emitter::setDisposable); 

      Observable.merge(
        autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler), 
        autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler)) 
       //if debounce and throttle emit the same item 
       .distinctUntilChanged() 
       .subscribe(emitter::onNext, emitter::onError, emitter::onComplete); 
     }); 

AUTOCONNECT的第二個參數是動作,它代表2個連接觀察者的已建立的連接。

這可以通過發射器:: setDisposable處置AUTOCONNECT當觀察者處置所產生的可觀測的使用。