2016-02-14 79 views
4

作爲RxAndroid入門的示例,我試圖實現一個搜索框,當用戶插入某些內容時會觸發一個休息呼叫。組合RxTextView Observable和Retrofit Observable

到目前爲止,我有兩個工作部分。第一觀察EditTextView ...

RxTextView.textChangeEvents(searchEditText) 
    .debounce(400, TimeUnit.MILLISECONDS) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(new Observer<TextViewTextChangeEvent>() { 
      @Override 
      public void onCompleted() { 
       Timber.d("onCompleted"); 
      } 

      @Override 
      public void onError(Throwable e) { 
       Timber.e(e, "onError"); 
       } 

      @Override 
      public void onNext(TextViewTextChangeEvent e) { 
       Timber.d("onNext" + e.text().toString()); 
      } 
     }); 

...和第二部分通過使用改造服務調用REST API:

APIManager.getService().searchRestaurants("test") 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(new Observer<List<Restaurant>>() { 
      @Override 
      public void onCompleted() { 
       Timber.d("onCompleted"); 
      } 

      @Override 
      public void onError(Throwable e) { 
       Timber.e(e, "onError"); 
      } 

      @Override 
      public void onNext(List<Restaurant> restaurants) { 
       Timber.d("onNext"); 
       for (Restaurant restaurant : restaurants) { 
        Timber.d(restaurant.getId() + ": " + restaurant.getName()); 
       } 
      } 
     }); 

我的問題是兩個部分相結合。我試圖通過使用flatMap操作如下:

RxTextView.textChangeEvents(searchEditText) 
     .debounce(400, TimeUnit.MILLISECONDS) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .flatMap(new Func1<TextViewTextChangeEvent, Observable<List<Restaurant>>>() { 
      @Override 
      public Observable<List<Restaurant>> call(TextViewTextChangeEvent txtChangeEvt) { 
       return APIManager.getService().searchRestaurants(txtChangeEvt.text().toString()); 
      } 
     }) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(new Observer<List<Restaurant>>() { 
      @Override 
      public void onCompleted() { 
       Timber.d("onCompleted"); 
      } 

      @Override 
      public void onError(Throwable e) { 
       Timber.e(e, "onError"); 
      } 

      @Override 
      public void onNext(List<Restaurant> restaurants) { 
       Timber.d("onNext"); 
       for (Restaurant restaurant : restaurants) { 
        Timber.d(restaurant.getId() + ": " + restaurant.getName()); 
       } 
      } 
     }); 

當我做到這一點我得到的異常以下:

java.lang.IllegalStateException: Must be called from the main thread. Was: Thread[RxCachedThreadScheduler-1,5,main] 
                       at com.jakewharton.rxbinding.internal.Preconditions.checkUiThread(Preconditions.java:28) 
                       at com.jakewharton.rxbinding.widget.TextViewTextChangeEventOnSubscribe.call(TextViewTextChangeEventOnSubscribe.java:21) 
                       at com.jakewharton.rxbinding.widget.TextViewTextChangeEventOnSubscribe.call(TextViewTextChangeEventOnSubscribe.java:12) 

所以,我想通過調用subscribeOn(AndroidSchedulers.mainThread()但在這種情況下解決這個問題,當然,我得到一個NetworkOnMainThread異常。

那麼我該怎麼做? 什麼是合適的方式來結合不同的觀察對象,這些觀察對象應該在不同的線程上執行?

回答

3

只要刪除第一個.observeOn(AndroidSchedulers.mainThread())。看看這個example

Observable.just(1) // 1 will be emited in the IO thread pool 
    .subscribeOn(Schedulers.io()) 
    .flatMap(...) // will be in the IO thread pool 
    .observeOn(Schedulers.computation()) 
    .flatMap(...) // will be executed in the computation thread pool 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(); // will be executed in the Android main thread (if you're running your code on Android) 
+0

它是正確的,我只是第一個觀察到的使用subscribeOn一次,然後用observeOn定義所有以下觀測量的調度?我進一步玩了observeOn和subscribeOn,似乎只有第一個subscribeOn有效,所有跟在subscribeOn的呼叫被observeOn覆蓋。我觀察到了嗎? – Sven

+0

@Sven我記得,只有一個'subscribeOn'會生效。如果多次調用'subscribeOn',將會使用定義的最後一個定義。 – MyDogTom