2017-04-18 63 views
4

我的用例是想在我的onNext中的特定條件之後進行處理。所以試圖使用DisposableObserver。這是工作如何在RxJava2中使用帶有lambda表達式的DisposableObserver

Observable.just(1, 2, 3, 4) 
    .subscribe(new DisposableObserver<Integer>() { 
        @Override 
        public void onNext(Integer integer) { 
         System.out.println("onNext() received: " + integer); 
         if (integer == 2) { 
         dispose(); 
         } 
        } 
        @Override 
        public void onError(Throwable e) { System.out.println("onError()"); } 
        @Override 
        public void onComplete() { System.out.println("onComplete()"); } 
        } 
    ); 

現在,如果你嘗試用lambda來代替這一點,對待lambda作爲

subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete) 

這樣做是爲了現在這樣的代碼。通過從onSubscribe保存一次性,然後調用disposable.dispose();從onNext。

private Disposable disposable; 
    private void disposableObserverTest() { 
    Observable.just(1, 2, 3, 4) 
     .subscribe(integer -> { 
       System.out.println("onNext() received: " + integer); 
       if (integer == 2) { 
       disposable.dispose(); 
       } 

      }, throwable -> System.out.println("error"), 
      () -> System.out.println("complete"), 
      disposable1 -> { 
       this.disposable = disposable1; 
      }); 
    } 

但是,如果您想直接調用dispose()如何使用lambda表達式?

+1

「這給出了一個錯誤」 - 什麼樣的? – azizbekian

+0

錯誤是它不會查找dispose(),因爲它將lambda視爲 ConsumeronNext,Consumer <? super Throwable> onError, Action onComplete。 但我希望lambda成爲DisposableObserver。 – bpr10

回答

3

這是因爲在第一種情況下,你叫

subscribe(DisposableObserver observer) 

而在第二種情況下,你叫

subscribe(Action1<? extends Integer> onNext, Action1<? extends Throwable> onError, Action0 onComplete) 

這意味着在第二種情況下,你不抱參考DisposableObserver,因此你不能打電話給dispose()

+0

確切的問題。所以我想在第二種情況下使用帶有lambda表達式的DisposableObserver。 – bpr10

4

您可以使用takeUntil關閉觀測值。

@Test 
public void takeUntil() throws Exception { 
    Observable.just(1, 2, 3, 4) 
      .takeUntil(integer -> integer == 2) 
      .test() 
      .assertValues(1, 2); 
}