0

受到T.Nurkiewicz的「用RxJava進行反應式編程」的啓發,我嘗試將它應用到我正在處理的項目中,這是我面臨的問題。RxJava:OnErrorFailedException。識別正確的原因

我有一個Rest端點,它接受一個輸入流和一個用戶名,並且爲更新的用戶名返回一個鏈接或返回一個錯誤的請求錯誤。以下是我想實現這個使用RxJava:

@PUT 
    @Path("{username}") 
    public Response updateCredential(@PathParam("username") final String username, InputStream stream) { 
     CredentialCandidate candidate = new CredentialCandidate(); 
     Observable.just(repository.getByUsername(username)) 
       .subscribe(
        credential -> { 
          serializeCandidate(candidate, stream); 
          try { 
           repository.updateCredential(build(credential, candidate)); 
          } catch (Exception e) { 
           String msg = "Failed to update credential +\""+username+"\": "+e.getMessage(); 
           throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build()); 
          } 
         }, 
         ex -> { 
          String msg = "Couldn't update credential \""+username+"\"" 
          + ". A credential with such username doesn't exist: " + ex.getMessage(); 
          logger.error(msg); 
          throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build()); 
       });//if the Observable completes without exceptions we have a success case 
     Map<String, String> map = new HashMap<>(); 
     map.put("path", "credential/" + username); 
     return Response.ok(getJsonRepr("link", uriGenerator.apply(appsUriBuilder, map).toASCIIString())).build(); 
} 

我的問題是在該行11(onNext方法的catch子句)。這是日誌輸出,迅速將展示發生了什麼:

19:23:50.472 [http-listener(4)] ERROR com.vgorcinschi.rimmanew.rest.services.CredentialResourceService    - Couldn't update credential "admin". A credential with such username doesn't exist: Failed to update credential +"admin": Password too weak! 

所以在onNext方法拋出的異常用於上游和結束行動的onError方法!顯然this works as designed,但我很困惑,我可以如何返回錯誤請求錯誤的正確原因。畢竟在我的測試用例中,存儲庫找到了用戶的憑證,但是正確的錯誤是建議的密碼太弱。這是產生錯誤的helper方法:

private Credential build(Credential credential, CredentialCandidate candidate) { 
     if(!isOkPsswd.test(candidate.getPassword())){ 
      throw new BadRequestException("Password too weak!", Response.status(Response.Status.BAD_REQUEST).build()); 
     } 
... 
} 

我仍然相當新的反應式編程,所以我意識到我可能會丟失的東西是顯而易見的。瀏覽本書並沒有讓我得到答案,所以我會很感激任何幫助。

以防萬一,這是完整的堆棧跟蹤:

updateCredentialTest(com.vgorcinschi.rimmanew.services.CredentialResourceServiceTest) Time elapsed: 0.798 sec <<< ERROR! 
rx.exceptions.OnErrorFailedException: Error occurred when trying to propagate error to Observer.onError 
    at com.vgorcinschi.rimmanew.rest.services.CredentialResourceService.lambda$updateCredential$9(CredentialResourceService.java:245) 
    at rx.internal.util.ActionSubscriber.onNext(ActionSubscriber.java:39) 
    at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134) 
    at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276) 
    at rx.Subscriber.setProducer(Subscriber.java:209) 
    at rx.Subscriber.setProducer(Subscriber.java:205) 
    at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:138) 
    at rx.internal.util.ScalarSynchronousObservable$JustOnSubscribe.call(ScalarSynchronousObservable.java:129) 
    at rx.Observable.subscribe(Observable.java:10238) 
    at rx.Observable.subscribe(Observable.java:10205) 
    at rx.Observable.subscribe(Observable.java:10045) 
    at com.vgorcinschi.rimmanew.rest.services.CredentialResourceService.updateCredential(CredentialResourceService.java:238) 
    at com.vgorcinschi.rimmanew.services.CredentialResourceServiceTest.updateCredentialTest(CredentialResourceServiceTest.java:140) 

回答

3

這似乎你沒有掌握無編程原理權。

第一件事是,Observable是通過他們的API是異步的,而你正試圖強制執行是同步的API,通過嘗試直接從方法返回Response值,而不是返回Observable<Response>隨着時間的推移射向Response值通過其onNext()通知。
這就是爲什麼你要處理異常的原因,爲了創建符合某些規則的適當流(Observable contract),每個通知lambda方法(onNext/onError)被Observable機制封裝,其中一些預期行爲是錯誤應該重定向到onError()方法,這是一個異常捕獲方法,你不應該扔到那裏,拋出那裏將被認爲是致命的錯誤,並且會被拋出OnErrorFailedException吞下。

理想的情況下它會是這樣的:爲了使訂閱時(而Observable.just(repository.getByUsername(username))將起到同步時可觀察的是結構)的要求發生

public Observable<Response> updateCredential(@PathParam("username") final String username, 
              InputStream stream) { 
    rerurn Observable.fromCallable(() -> { 
     CredentialCandidate candidate = new CredentialCandidate(); 
     Credential credential = repository.getByUsername(username); 
     serializeCandidate(candidate, stream); 
     repository.updateCredential(build(credential, candidate)); 
     Map<String, String> map = new HashMap<>(); 
     map.put("path", "credential/" + username); 
     return Response.ok(getJsonRepr("link", uriGenerator.apply(appsUriBuilder, map).toASCIIString())).build(); 
    }) 
      .onErrorReturn(throwable -> { 
       String msg = "Failed to update credential +\"" + username + "\": " + e.getMessage(); 
       throw new BadRequestException(msg, Response.status(Response.Status.BAD_REQUEST).build()); 
      }); 
} 

使用fromCallable,成功路徑withing可調用本身,如果發生任何錯誤,您將使用onErrorReturn運算符將其轉換爲您的自定義異常。

用他的方法你將返回Observable對象,當你訂閱它時,你將獲得Observable的所有好處和反應方法,例如能夠將它與其他一些操作組合起來,能夠從外部指定它是否會同步(當前線程)或某個其他線程上的異步(使用Scheduler)。

關於反應式編程的更多詳細解釋,我建議從AndréStaltz的這個偉大的tutorial開始。