2017-03-10 61 views
0

我正在嘗試反應堆庫,我無法弄清楚爲什麼在單聲道以下永遠不會返回onNext或onComplete調用。我想我錯過了非常平凡的事情。這是一個示例代碼。無法接收onNext和onComplete訂閱的單聲道呼叫

MyServiceService service = new MyServiceService(); 
    service.save("id") 
      .map(myUserMono -> new MyUser(myUserMono.getName().toUpperCase(), myUserMono.getId().toUpperCase())) 
      .subscribe(new Subscriber<MyUser>() { 
       @Override 
       public void onSubscribe(Subscription s) { 
        System.out.println("Subscribed!" + Thread.currentThread().getName()); 
       } 

       @Override 
       public void onNext(MyUser myUser) { 
        System.out.println("OnNext on thread " + Thread.currentThread().getName()); 

       } 

       @Override 
       public void onError(Throwable t) { 
        System.out.println("onError!" + Thread.currentThread().getName()); 

       } 

       @Override 
       public void onComplete() { 
        System.out.println("onCompleted!" + Thread.currentThread().getName()); 

       } 
      }); 


} 

private static class MyServiceService { 
    private Repository myRepo = new Repository(); 

    public Mono<MyUser> save(String userId) { 
     return myRepo.save(userId); 
    } 
} 

private static class Repository { 

    public Mono<MyUser> save(String userId) { 
     return Mono.create(myUserMonoSink -> { 
      Future<MyUser> submit = exe.submit(() -> this.blockingMethod(userId)); 
      ListenableFuture<MyUser> myUserListenableFuture = JdkFutureAdapters.listenInPoolThread(submit); 
      Futures.addCallback(myUserListenableFuture, new FutureCallback<MyUser>() { 
       @Override 
       public void onSuccess(MyUser result) { 
        myUserMonoSink.success(result); 
       } 

       @Override 
       public void onFailure(Throwable t) { 
        myUserMonoSink.error(t); 
       } 
      }); 
     }); 
    } 

    private MyUser blockingMethod(String userId) throws InterruptedException { 
     Thread.sleep(5000); 
     return new MyUser("blocking", userId); 
    } 
} 

以上代碼僅打印Subcribed!main。什麼我無法弄清楚是爲什麼在未來的回調不會通過myUserMonoSink.success

回答

2

重要的推動價值要記住的是,一個FluxMono異步,大部分的時間。

訂閱後,保存用戶的異步處理將從執行程序開始,但在.subscribe(...)後繼續執行主代碼。

因此,main線程退出,在任何內容被推送到Mono之前終止您的測試。

[側欄]:什麼時候有同步?

當數據來源是Flux/Mono同步工廠方法時。但增加的先決條件是運營商鏈的其餘部分不會切換執行上下文。這可能會明確地發生(您使用publishOnsubscribeOn運算符)或隱式(某些運算符(如時間相關運算符),例如delayElements,運行在單獨的Scheduler上)。

簡而言之,您的源代碼在exeExecutorService線程中運行,因此Mono確實是異步的。另一方面,您的片段在main上運行。

如何解決這個問題

觀察實驗的Mono正確的行爲(而不是在生產中完全異步代碼),有幾個可能性:

  • 保持subscribe與system.out.printlns,但在onCompleteonError內部添加一個new CountDownLatch(1).countDown()await之後的倒計數鎖存器subscribe
  • 使用.log().block()而不是.subscribe(...)。您將失去對每個事件做什麼的定製,但log()將爲您打印出來(如果您已配置了日誌框架)。 block()將恢復到阻塞模式,並完成我上面提到的CountDownLatch。它返回一次可用的值,或者在出錯時拋出一個Exception。的
  • 代替log()您可以自定義日誌記錄或使用.doOnXXX(...)方法等副作用(有一對幾乎每一個類型的事件事件+的組合,如:doOnSubscribedoOnNext ...)

如果」重新進行單元測試,使用reactor-tests項目中的StepVerifier。當您致電.verify()時,它將訂閱flux/mono並等待事件。請參閱參考指南chapter on testing(以及一般參考指南的其餘部分)。

1

問題是在創建的匿名類onSubscribe方法中什麼都不做。 如果你看看LambdaSubscriber的實現,它會請求一些事件。 因爲它有一些預定義的邏輯,所以它更容易擴展BaseSubscriber

所以你的用戶實現將是:

MyServiceService service = new MyServiceService(); 
service.save("id") 
     .map(myUserMono -> new MyUser(myUserMono.getName().toUpperCase(), myUserMono.getId().toUpperCase())) 
     .subscribe(new BaseSubscriber<MyUser>() { 
       @Override 
       protected void hookOnSubscribe(Subscription subscription) { 
        System.out.println("Subscribed!" + Thread.currentThread().getName()); 
        request(1); // or requestUnbounded(); 
       } 

       @Override 
       protected void hookOnNext(MyUser myUser) { 
        System.out.println("OnNext on thread " + Thread.currentThread().getName()); 

        // request(1); // if wasn't called requestUnbounded() 2 
       } 

       @Override 
       protected void hookOnComplete() { 
        System.out.println("onCompleted!" + Thread.currentThread().getName()); 
       } 

       @Override 
       protected void hookOnError(Throwable throwable) { 
        System.out.println("onError!" + Thread.currentThread().getName()); 
       } 

      }); 

也許它不是最好的實現,我是新來的太反應堆。

Simon的答案對測試異步代碼有很好的解釋。