2016-11-15 29 views
5

假設我有一個處理器,當按下按鈕時會發出一個布爾值,可以認爲這是一個切換。基於RxJava 2.X中的布爾門暫停和恢復觀察值?

boolean gateValue = true; 
    PublishProcessor<Boolean> gate = PublishProcessor.create(); 
    view.onButtonClicked() 
      .subscribe(new Action1<Void>() { 
       @Override 
       public void call(final Void aVoid) { 
        gate.onNext(gateValue = !gateValue); 
       } 
      })); 

我想這樣做是使用門的價值,暫停和恢復可觀察序列,緩衝發射值,而暫停。

我已經閱讀了很多,雖然看起來可能在其他語言的反應式擴展中,但RxJava似乎不支持它。

下面是我想達到的一個例子,它只是每秒輸出一個增量值。當我按下按鈕我所要的輸出停止,直到我再次按下它應該輸出兩次按鍵之間發出的每一個項目:

Flowable.interval(1, TimeUnit.SECONDS) 
       .bufferWhile(gate) 
       .flatMapIterable(longs -> longs) 
       .subscribe(new Consumer<Long>() { 
        @Override 
        public void accept(final Long aLong) throws Exception { 
         view.displayTime(aLong); 
        } 
       }); 

有誰知道的方式來實現這樣的事情?

編輯我已經寫了關於如何實現這一目標https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk

+0

這需要一個自定義操作符,並且我有一個可能的實現RxJava 1 [這裏](https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08)。 – akarnokd

+0

@akarnokd您是否知道RxJava 2足夠了解我是否可以輕鬆轉換它?我讀過創建自定義操作符是非常棘手的,所以我不認爲我處於該級別 –

回答

4

RxJava2Extensions庫中現在有一個運算符valve()來執行請求的行爲。

+0

當你把它與Android的生命週期掛鉤時,你可以完成的事情是驚人的。 – ikarhun

0

只需使用現成的Observable.window運營商,它有一個Observable<T>參數博客文章。

+0

只有當gate observable發射時纔會從源可觀察值發出?我希望源發出,直到門發出假。 –

+0

啊,的確 - 您可以使用2參數打開和關閉正在發射的「真」/「假。 –

+0

如果我告訴它在存在真值時打開它,並且在存在錯誤時關閉,但在窗口關閉時它不緩衝發射的項目,它會將其丟棄。我也剛剛意識到,緩衝區和窗口只能批量發送項目而不是流,所以操作員都不會工作。 –