2017-04-19 59 views
0

我在RxJava有當製片人(API請求)發射事件過快,消費(API響應)一個奇怪的問題熱觀測只獲得了API請求的第二個響應。Rxjava -

比方說,我有一對夫婦需要向服務器發送查詢數據庫的請求,因爲一些來自服務器的查詢需要一定的時間。所以當我得到迴應時,第二個請求可能會首先返回,而奇怪的是有時候我沒有得到第一個請求的響應。

這使得API調用的代碼:

public void sendRequests() { 
    // using RxJava to make server polling. 
    startPollingServer(); 
} 

的startPollingServer()方法是使用RxJava從服務器輪詢數據。

public void startPollingServer() { 
    mApiService.getPollingFromServer() 
     .retryWhen() 
     .repeatWhen() 
     .map() 
     .subscribeOn() 
     .observeOn() 
     .subscribe(
      // call onNext 
      // call onError 
     ) 
} 

然而,當sendRequests()是調用太快,既不onNext,的onComplete,onError的被稱爲。第一次請求沒有任何事情發生。 但是我確實得到了CharlesProxy的第一個迴應,這真的很奇怪。

所以我的問題是,有沒有可能RxJava不理會我的第一反應?我需要合併來自startPollingServer()的Observables嗎?

+0

您是否在Android應用程序或測試中使用它?因爲如果它是一個測試,它將只是通過,因爲調用線程將訂閱,只是通過方法startPollingServer。不建議僅以無效方式訂閱。如果你不處理訂閱,你會得到內存泄漏。你的startPolling方法應該返回Observable <>並在一個地方編寫。 –

+0

我在android應用程序中使用,sendRequests()由用戶啓動,我模擬用戶是否按下按鈕或某個用戶​​界面非常快。它只是顯示了一些代碼片段,我有代碼來處理預訂應用程序被銷燬時的訂閱,所以沒有內存泄漏 – Cheng

+0

爲什麼你確定它是RxJava問題,服務器是否確實在每種情況下都返回響應?似乎你在這裏有獨立的觀察者流。 – yosriz

回答

0

據我2年以上與RxJava經驗它是非常不太可能失去一些數據。 考慮下面的事情:

誤差被抑制

使用retryWhen和repeatWhen可能導致錯誤的抑制,嘗試添加 '的onError()' 操作,以便對付那些:

public void startPollingServer() { 
    mApiService.getPollingFromServer() 
     .doOnError(throwable -> log.error("Got an error", throwable)) // catch error 
     .retryWhen() 
     .repeatWhen() 
     .map() 
     .subscribeOn() 
     .observeOn() 
     .subscribe(
      // call onNext 
      // call onError 
     ) 
} 

http://reactivex.io/RxJava/javadoc/rx/Observable.html#doOnError(rx.functions.Action1)

不正確的平行度

Observable.just(1,2,3) 
.flatMap(i -> doNetworkCall(i)) 
.first() 

這樣的代碼可能會導致的「doNetworkCall(I)」,其中第二應答可能會比第一速度更快的並行執行。爲了驗證,使用 'concatMap' 來代替,這保證了與被處理的流的順序:http://reactivex.io/documentation/operators/concat.html

調試

算 'doOnNext()', 'doOnSubscribe()', 'doOnCompleted()' 和' doOnError()'可能會幫助您找到錯誤

Observable.just(1,2,3) 
.flatMap(i -> doNetworkCall(i) 
      .doOnSubscribe(() -> log.debug("Launched {}", i)) 
      .doOnNext(response -> log.debug("Got response {} for {}", response, i)) 
      .doOnError(throwable -> log.error("For error for " + i, throwable)) 
      .doOnComplete(() -> log.info("Finished processing of {}", i)) 
) 
.first()