2017-03-08 43 views
0

我已經開始使用項目反應器了,並且希望將我們的API之一移動到被動的處理方式。我想知道什麼是處理像ListenableFuture的東西。在項目反應器中處理ListenableFuture

在我的情況下,我使用Cassandra,當我調用session.executeAsync()時,這將返回一個ResultSetFuture,它擴展了ListenableFuture。 下面是我現在編碼的例子,我似乎並不滿意將ListenableFuture公開給客戶端。

public Mono<ListenableFuture<Void> save(Publisher<AccountDTO> accountPublisher) { 
return Mono.just(accountPublisher) 
.map(accountDTO -> { 
       Account accountEntity = modelMapper.map(accountDTO, Account.class); 
       return mappingManager.mapper(Account.class).saveAsync(accountEntity); 
      }) 
      .retry(1) 
      .doOnError(throwable -> log.error("Unable to create account ")) 
      .mapError(throwable -> new MyCustomException("") 

}

我的問題是,它是一個很好的做法,暴露ListenableFuture,我個人並不想給這樣的返回給客戶端,他們可以阻止任何東西。有沒有更好的方法來處理這個項目反應器,我可以只返回一個單聲道?

回答

2

通過使用工廠方法Mono.create(),您可以輕鬆地橋接ListenableFuture<Void>異步API,而代之以公開Mono<Void>。該方法採用Consumer<Sink>,您所提供的拉姆達是:

  1. 添加成功監聽到通話sink.success()(因爲沒有實際價值,或者您也可以撥打success(aVoid)與收到的Void值未來聽衆)
  2. 添加失敗監聽未來它調用sink.error(failure)

這幾乎是它!參見create參考文檔(雖然這一個提到Flux版本,這是更復雜一點由於必須處理多個值):http://projectreactor.io/docs/core/release/reference/docs/index.html#producing.create

+1

感謝@Simon指導我在此。這有幫助。 – Coder

0

發帖代碼片段,我編碼爲通過上述@Simon引導。

@Override 
public Mono<Void> save(AccountDTO accountDTO) { 
    return Mono.create(voidMonoSink -> { 

     Account account = converter.map(accountDTO, Account.class); 

     ListenableFuture<Void> voidListenableFuture = mappingManager.mapper(Account.class).saveAsync(account); 

     Futures.addCallback(voidListenableFuture, new FutureCallback<Void>() { 

      @Override 
      public void onSuccess(Void result) { 
       voidMonoSink.success(result); 
      } 

      @Override 
      public void onFailure(Throwable t) { 
       log.error("Unable to save account " + accountDTO, t); 
       voidMonoSink.error(new MyCustomException()); 
      } 
     }); 
    }); 
} 
0

在Java 8中使用lambda表達式,如果你正在尋找西蒙的答案創建流量使用ListenableFutures的實現,

Flux.create(fluxSink -> { 
      future.addCallback(
        result -> { 
         fluxSink.next(result); 
         fluxSink.complete(); 
        }, 
        ex -> fluxSink.error(ex)); 
     });