2017-05-01 43 views
1

我有一個簡單的程序是這樣的:Observable.take()導致NPE

public class MainApp { 
    public static void main(String[] args) { 
     getAcronyms() 
       .flatMap(Observable::fromIterable) 
       .flatMap(MainApp::getTitle) 
       .filter(Objects::nonNull) 
       .subscribe(System.out::println); 

    } 

    private static Observable<List<String>> getAcronyms(){ 
     List<String> strings = new ArrayList<>(); 
     strings.add("YOLO"); 
     strings.add("LMAO"); 
     strings.add("ROFL"); 
     strings.add("AYY LMAO"); 
     return new Observable<List<String>>() { 
      @Override 
      protected void subscribeActual(Observer<? super List<String>> observer) { 
       observer.onNext(strings); 
       observer.onComplete(); 
      } 
     }; 
    } 

    private static Observable<String> getTitle(String url) { 
     return new Observable<String>() { 
      @Override 
      protected void subscribeActual(Observer<? super String> observer) { 
       observer.onNext(url + " title!"); 
       observer.onComplete(); 
      } 
     }; 
    } 
} 

這工作得很好,但是當我鏈接一個take

getAcronyms() 
     .flatMap(Observable::fromIterable) 
     .flatMap(MainApp::getTitle) 
     .filter(Objects::nonNull) 
     .take(2) 
     .subscribe(System.out::println); 

它打印2倍的值,但給我NPE:

YOLO標題!

LMAO標題!

在 io.reactivex.internal.operators.observable.ObservableTake $ TakeObserver.onComplete(ObservableTake.java:83) 線程 「main」 顯示java.lang.NullPointerException異常的 io.reactivex.internal.operators。 observable.ObservableTake $ TakeObserver.onNext(ObservableTake.java:64) 在 io.reactivex.internal.operators.observable.ObservableFilter $ FilterObserver.onNext(ObservableFilter.java:52) 在 io.reactivex.internal.operators。 observable.ObservableFlatMap $ MergeObserver.tryEmit(ObservableFlatMap.java:262) at io.reactivex.internal.operators.observable.ObservableFlatMap $ InnerObserver.onNext(Obs ervableFlatMap.java:559) at MainApp $ 2.subscribeActual(MainApp.java:41)at io.reactivex.Observable.subscribe(Observable.java:10842)at io.reactivex.internal.operators.observable.ObservableFlatMap $ MergeObserver .subscribeInner(ObservableFlatMap.java:162) 在 io.reactivex.internal.operators.observable.ObservableFlatMap $ MergeObserver.onNext(ObservableFlatMap.java:139) 在 io.reactivex.internal.operators.observable.ObservableFlatMap $ MergeObserver .drainLoop(ObservableFlatMap.java:436) 在 io.reactivex.internal.operators.observable.ObservableFlatMap $ MergeObserver.drain(ObservableFlatMap.java:323) 在 io.reactivex.internal.operators.observable.Obse rvableFlatMap $ InnerObserver.onSubscribe在 io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:55)(ObservableFlatMap.java:546) 在io.reactivex.Observable.subscribe(Observable.java:10842 )在 io.reactivex.internal.operators.observable.ObservableFlatMap $ MergeObserver.subscribeInner(ObservableFlatMap.java:162) 在 io.reactivex.internal.operators.observable.ObservableFlatMap $ MergeObserver.onNext(ObservableFlatMap.java:139) at MainApp $ 1.subscribeActual(MainApp.java:31)at io.reactivex.Observable.subscribe(Observable.java:10842)at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55 ) 在io.reactivex.Observable.subscribe(Observable.java:10842)在 io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55) 在io.reactivex.Observable.subscribe(觀察到。 java:10842)at io.reactivex.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30) at io.reactivex.Observable.subscribe(Observable.java:10842)at io.reactivex.internal。 (ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10842)at io.reactivex.Observable.subscribe(Observable.java:10828)at io。 reactivex.Observable.subscribe(觀察到。Java的:10731)在 MainApp.main(MainApp.java:18)

有人能幫助我弄清楚爲什麼發生這種情況,什麼我做錯了嗎?

回答

2

發生此異常是由於take在發出定義數量的項目後內部嘗試處置未設置的對象Disposable

因此,您必須通過在subscribeActual方法實現中調用observer.onSubscribe(disposable)創建初始Observable時提供它。但不要重新發明輪子,而且通過調用它的公共構造函數來創建Observable是爲了自定義運算符。只需使用靜態工廠方法。在你的情況下,最好的選擇是Observable.fromCallable

private static Observable<List<String>> getAcronyms(){ 
    return Observable.fromCallable(new Callable<List<String>>() { 
     @Override 
     public List<String> call() throws Exception { 
      List<String> strings = new ArrayList<>(); 
      strings.add("YOLO"); 
      strings.add("LMAO"); 
      strings.add("ROFL"); 
      strings.add("AYY LMAO"); 
      return strings; 
     } 
    }); 
} 

同時檢查這篇文章:RxJava 2 Disposable -  Under the hood

相關問題