2017-07-05 124 views
1

我得到拋出OnErrorNotImplementedException和應用程序崩潰,儘管下游處理錯誤(?)。OnErrorNotImplementedException使用RxJava2和Retrofit2莫斯MVI

異常

E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1 
Process: pl.netlandgroup.smartsab, PID: 9920 
io.reactivex.exceptions.OnErrorNotImplementedException: HTTP 401 Unauthorized 
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704) 
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701) 
    at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:74) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119) 
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119) 
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onError(ObservableSubscribeOn.java:63) 
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:56) 
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37) 
    at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43) 
    at io.reactivex.Observable.subscribe(Observable.java:10838) 
    at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) 
    at io.reactivex.Observable.subscribe(Observable.java:10838) 
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96) 
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452) 
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61) 
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) 
    at java.lang.Thread.run(Thread.java:761) 
Caused by: retrofit2.adapter.rxjava2.HttpException: HTTP 401 Unauthorized 
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:54) 
    at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(BodyObservable.java:37)  
    at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:43)  
    at io.reactivex.Observable.subscribe(Observable.java:10838)  
    at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)  
    at io.reactivex.Observable.subscribe(Observable.java:10838)  
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)  
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)  
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)  
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)  
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)  
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)  
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)  
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)  
    at java.lang.Thread.run(Thread.java:761)  

改造存儲庫:

class RetrofitRepository (retrofit: Retrofit) { 

    val apiService: ApiService = retrofit.create(ApiService::class.java) 
    var size: Int = 0 

    fun getMapResponse(pageIndex: Int = 0): Observable<MapResponse> { 
     return apiService.getMapResponse(pageIndex = pageIndex) 
       .doOnError { Log.d("error", it.message) } 
       .doOnNext { Log.d("currThread", Thread.currentThread().name) } 

    } 

    fun getItemsFormResponses(): Observable<List<Item>> { 
     val list = mutableListOf<Observable<List<Item>>>() 
     val resp0 = getMapResponse() 
     resp0.subscribe { size = it.totalCount } 
     var accum = 0 
     do { 
      list.add(getMapResponse(accum).map { it.items }) 
      accum++ 
     } while (list.size*200 < size) 
     return Observable.merge(list) 
    } 
} 

此結果是由交互器觀察到:

class MapInteractor @Inject constructor(private val repository: RetrofitRepository) { 

    fun getMapItems(): Observable<MapViewState> { 
     return repository.getItemsFormResponses() 
       .map { 
        if(it.isEmpty()) { 
         [email protected] MapViewState.EmptyResult() 
        } else { 
         val mapItems = it.map { it.toMapItem() } 
         [email protected] MapViewState.MapResult(mapItems) 
        } 
       } 
       .doOnNext { Log.d("currThread", Thread.currentThread().name) } 
       .startWith(MapViewState.Loading()) 
       .onErrorReturn { MapViewState.Error(it) } 
    } 
} 

onErrorReturn { MapViewState.Error(it) }正確地發射(應用程序崩潰前右我可以看到正確的東西呈現在屏幕上)。我怎樣才能避免這種異常,同時仍然保持MVI架構?

編輯

answer provided by dimsuz是正確的解決方案,雖然實現了合併,並返回一個可觀察的所有物品也必須進行修改,以這樣的:

fun getMapItems(): Observable<List<Item>> { 
    return getMapResponse().map { 
     val size = it.totalCount 
     val list = mutableListOf<Observable<List<Item>>>() 
     var accum = 0 
     do { 
      list.add(getMapResponse(accum++).map { it.items }) 
     } while (list.size*200 < size) 
     [email protected] list.zip { it.flatten() } 
    }.mergeAll() 
} 
+0

我不能馬上發現不對勁。您可以添加整個堆棧跟蹤和演示的整個代碼(或者你的項目的鏈接,如果它是開源的,即在GitHub上)? – sockeqwe

回答

2

我認爲錯誤的是沿着getItemsFromResponse()排列:

val resp0 = getMapResponse() 
    resp0.subscribe { size = it.totalCount } 

在這裏你訂閱,bu不要處理錯誤情況。 其實這個代碼是不正確的,因爲你打破兩個獨立片的Rx鏈,你不應該這樣做。

你應該做的是這樣的:

fun getItemsFormResponses(): Observable<List<Item>> { 
    return getMapResponse().map { resp0 -> 
    val size = resp0.totalCount 

    val list = mutableListOf<Observable<List<Item>>>() 
    var accum = 0 
    do { 
     list.add(getMapResponse(accum).map { it.items }) 
     accum++ 
    } while (list.size*200 < size) 
    return Observable.merge(list) 
    } 
} 

即通過與操作者延伸的鏈,而然後用subscribe()打破它提取size