2017-07-04 73 views
0

我有一堆觀察對象,我在一個方法中運行/訂閱(大約9個確切的),它本身返回一個可觀察值。爲了這個問題的目的,我把它縮小到2個觀察值。這是我創建的方法,它返回一個包含其他可觀察對象的Observable。使用多個observables創建可觀察?

public static Observable<MyCustomObject> runAll(Service networkService) { 
     return Observable.create(subscriber -> { 

      networkService.getOne().subscribe(response -> { 
       Request request = response.raw().request(); 
       MyCustomObject case = new MyCustomObject(request); 
       subscriber.onNext(case); 
      }, exception -> { 
       throw new OnErrorNotImplementedException(exception); 
      }); 


      networkService.getTwo().subscribe(response -> { 
       Request request = response.raw().request(); 
       MyCustomObject case = new MyCustomObject(request); 
       subscriber.onNext(case); 
      }, exception -> { 
       throw new OnErrorNotImplementedException(exception); 
      }); 

      subscriber.onComplete(); 
     }); 
    } 

我然後使用可觀察到的是所返回......

 runAll(networkService) 
.subscribeOn(Schedulers.io()) 
.subscribe(case -> { 
      //do stuff 

     }, new Consumer<Throwable>() { 
      @Override 
      public void accept(Throwable throwable) throws Exception { 
       //handle error 
      } 
     }); 

我不知道如果我正確地創建一個可觀察的。我基本上用Observable替換了我之前在這裏的一個監聽器/接口,但是我知道使用Observable.create()是不容易的。我應該以另一種方式去做嗎?做我在做什麼有什麼不對嗎?

編輯:getOne()和getTwo()是網絡調用,所以他們返回Observables。

編輯2:目前有這樣的

public static Observable<MyCustomObject> quickRun(Service networkService) { 
    return Observable.concat(
      networkService.getOne().map(response -> { 
       Request request = response.raw().request(); 
       MyCustomObject case = new MyCustomObject(request); 
       return case; 
      }), 
      networkService.getTwo().map(response -> { 
       Request request = response.raw().request(); 
       MyCustomObject case = new MyCustomObject(request); 
       return case; 
      }) 
    ); 
} 
+0

是否所有9個'Observable's返回相同的類型?他們互相依賴,還是可以獨立運行? –

+0

可能有一個observable返回一個不同的類型,但大多數情況下我可以使它工作(我可以將兩個結果都包裝在另一個類型或其他類型中),並且每個都可以獨立運行。 – EGHDK

回答

0

是的,你應該做它用不同的方式。
每當你將Observable視爲像常規的異步回調那樣,這是一種你沒有做出反應的氣味。
正如你所說Observable.create()是不平凡的方式,創造Observable的一些陷阱,也就是說,你可能使用舊版本的RxJava 1,與更新的版本(1.3+我認爲),並與RxJava2,創建基於發射器並且更安全。你可以閱讀here關於Create和發射器方法的缺陷。 (作爲RxJava2的附註,有另一種方式extending Observable)。

所有這些方法都是在異步回調世界與反應世界之間架起一道橋樑,並用Observable包裝任何類型的異步操作。至於你的情況,因爲你手頭已經有Observables,所以你需要將它們組合成單一的流 - Observable。 Rx有很多運營商爲此目的,根據你的例子Observable.merge似乎是合適的運營商,你所有Observable將異步運行(注意,你將需要適用於他們每個人的IO調度器),並且每個Observable將發射它在合併流上的結果,當所有的Observable將完成時 - onCompleted將在合併Observable上調用,這在您的示例中是錯誤的,因爲它在剛啓動所有任務之後才被調用。

Observable.merge(
     networkService.getOne() 
       .subscribeOn(Schedulers.io()), 
     networkService.getTwo() 
       .subscribeOn(Schedulers.io()) 
) 
     .map(response -> { 
      Request request = response.raw().request(); 
      return new MyCustomObject(request); 
     }) 
     .subscribe(customObject -> { 
        //do stuff 
       }, throwable -> { 
        //handle error 
       } 
     ); 
+0

其實,就我而言,我希望他們都能同步運行......這是怎麼改變的? – EGHDK

+0

同樣的方法也適用,只是用不同的運營商 - 'Observable.concat()' – yosriz

+0

所以我會做'Observable.concat( networkService.getOne() .MAP(), networkService.getTwo() .MAP () )'?不確定你通過合併定義的順序,然後映射... – EGHDK