我正在嘗試反應堆庫,我無法弄清楚爲什麼在單聲道以下永遠不會返回onNext或onComplete調用。我想我錯過了非常平凡的事情。這是一個示例代碼。無法接收onNext和onComplete訂閱的單聲道呼叫
MyServiceService service = new MyServiceService();
service.save("id")
.map(myUserMono -> new MyUser(myUserMono.getName().toUpperCase(), myUserMono.getId().toUpperCase()))
.subscribe(new Subscriber<MyUser>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("Subscribed!" + Thread.currentThread().getName());
}
@Override
public void onNext(MyUser myUser) {
System.out.println("OnNext on thread " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable t) {
System.out.println("onError!" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onCompleted!" + Thread.currentThread().getName());
}
});
}
private static class MyServiceService {
private Repository myRepo = new Repository();
public Mono<MyUser> save(String userId) {
return myRepo.save(userId);
}
}
private static class Repository {
public Mono<MyUser> save(String userId) {
return Mono.create(myUserMonoSink -> {
Future<MyUser> submit = exe.submit(() -> this.blockingMethod(userId));
ListenableFuture<MyUser> myUserListenableFuture = JdkFutureAdapters.listenInPoolThread(submit);
Futures.addCallback(myUserListenableFuture, new FutureCallback<MyUser>() {
@Override
public void onSuccess(MyUser result) {
myUserMonoSink.success(result);
}
@Override
public void onFailure(Throwable t) {
myUserMonoSink.error(t);
}
});
});
}
private MyUser blockingMethod(String userId) throws InterruptedException {
Thread.sleep(5000);
return new MyUser("blocking", userId);
}
}
以上代碼僅打印Subcribed!main。什麼我無法弄清楚是爲什麼在未來的回調不會通過myUserMonoSink.success