2015-05-12 62 views
7

我想異步執行2個網絡調用 - 我正在使用Retrofit + RxJava來完成此操作。這個邏輯來自一個簡單的Runner類來測試解決方案。注意:這主要涉及服務器端的RxJava。正確使用Retrofit + RxJava的combineLatest

我的代碼如下所示:

public static void main(String[] args) throws Exception { 
    Api api = ...; 

    Observable.combineLatest(
     api.getStates(), 
     api.getCmsContent(), 
     new Func2<List<States>, CmsContent, String>() { 
     @Override public String call(List<State> states, CmsContent content) { 
      ... 
      return "PLACEHOLDER"; 
     } 
     }) 
     .observeOn(Schedulers.immediate()) 
     .subscribeOn(Schedulers.immediate()) 
     .subscribe(new Observer<String>() { 
     @Override public void onCompleted() { 
      System.out.println("COMPLETED"); 
     } 

     @Override public void onError(Throwable e) { 
      System.out.println("ERROR: " + e.getMessage()); 
     } 

     @Override public void onNext(String s) { 
      // I don't care what's returned here 
     } 
     }); 
} 

三個問題:

  1. Observable.combineLatest使用時要執行多個REST異步調用,並繼續在所有呼叫都完成了最好的操作?我的Func2實現當前返回String。在執行2個API調用之後,我將在Func2#call()方法中處理結果。我不在乎什麼是返回 - 但是,我必須有更好的方法來處理這個問題 - 我是否正確?
  2. 使用上面的代碼正確執行API調用。但是當我運行該程序時,main方法沒有用正確的Process finished with exit code 0來完成。什麼可能導致代碼掛起?

更新 - 2015年5月14日

基礎上的建議,我已經改變了邏輯如下:

public static void main(String[] args) throws Exception { 
    Api api = ...; 

    Observable.zip(
     api.getStates(), 
     api.getCmsContent(), 
     new Func2<List<States>, CmsContent, Boolean>() { 
     @Override public Boolean call(List<State> states, CmsContent content) { 
      // process data 
      return true; 
     } 
     }) 
     .subscribeOn(Schedulers.io()) 
     .toBlocking() 
     .first(); 
} 

這看起來像解決我是尋找。我打算用一段時間來看看是否遇到麻煩。

回答

1

1)沒有最好的是使用zip()。如果兩個(或更多)apis中的一個返回「較慢」的不同結果/它具有緩存的本質,則最新結合是很好的。

2)Fun2有助於合併結果。在onNext()或onError()中處理結果更好(體系結構)。您可以使用簡單的Pair<T,Y>類將結果從Func2傳遞到onNext()。

3)沒有錯。上述結果應該在onNext()中處理,而不是在onComplete中處理。根據Retrofit's source code,結果僅在onNext()中傳遞(當然是正確的)。

希望那些幫助。

5

1)如果你知道你在兩條路徑上都會有一個值,那麼它就好比zip

2)您想要做什麼?您會在Func2中獲得一對值,如果您不真正關心與onNext一起旅行的內容,請返回您選擇的值。

3)Schedulers.immediate()在某種意義上不是真正的調度程序,而且很容易出現相同的池死鎖情況。你真的不需要使用它。如果要在完成異步工作之前阻止主線程,請使用例如toBlocking().first()

+0

謝謝,我已經使用您的建議更新了我的代碼示例。 – Kasa

1

我意識到,我很擔心晚了一年就這一點,但發表在2015年5月14日的OP編輯不符合他原來的規定:

我想執行2個網絡電話異步

  1. 觀測量getStatesgetCmsContent,除非他們在單獨的線程分別認購將不會同時執行。這是他在帖子中省略的一個關鍵點,以前的答案都沒有提到。

    Observable.fromCallable(() -> doStuff()) 
        .subscribeOn(Schedulers.computation()); 
    

正如@akarnokd說,在情況下,兩個流具有單個值,zipcombineLatest行爲類似。合併函數將阻塞,直到getStatesgetCmsContent都返回,但像上面顯示的那樣,它們中的每一個都在單獨的線程上同時執行。

  • 另一種解決方案依賴於合併List<States>CmsContent到達時的能力。考慮到他的代碼,顯然有某種「數據持有者」(未顯示),因爲他返回的是Boolean,而不是合併數據。以下,forEach同時執行。

    Observable.just(api.getStates(), api.getCmsContent()) 
    // subscribe on separate thread as shown previously 
    .flatMap(this::buildObservable) 
    .toBlocking() 
    // executes concurrently 
    .forEach(item -> { 
        // merge into "data holder"   
    }); 
    
  • 當然,該代碼具有不被強類型所以這是一個作出選擇的問題。

    相關問題