2017-10-19 73 views
5

說我有一個API,基於一些查詢條件,會發現或構建一個小窗口:異步,在Java矢量/流數據可組合的返回值9

Widget getMatchingWidget(WidgetCriteria c) throws Throwable 

的(同步)的客戶端代碼的外觀像:

try { 
    Widget w = getMatchingWidget(criteria); 
    processWidget(w); 
} catch (Throwable t) { 
    handleError(t); 
} 

現在說找到或構建一個小部件是不可預知的昂貴,我不希望客戶端在等待它時阻塞。所以我將其更改爲:

CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c) 

客戶可以再一次將:

CompletableFuture<Widget> f = getMatchingWidget(criteria); 
f.thenAccept(this::processWidget) 
f.exceptionally(t -> { handleError(t); return null; }) 

或:

getMatchingWidget(criteria).whenComplete((t, w) -> { 
    if (t != null) { handleError(t); } 
    else { processWidget(t); } 
}); 

現在,讓我們說,而不是同步API可以返回0到n小部件:

Stream<Widget> getMatchingWidgets(WidgetCriteria c) 

天真,我可以這樣寫:

CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c) 

然而,這並不能使代碼非阻塞的,它只是推動周圍堵塞 - 無論是Future塊,直到所有的Widgets可用,或代碼遍歷Stream塊,等待每個Widget。我要的是什麼,讓我處理每個插件到達時,是這樣的:

void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor) 

但是,這不提供錯誤處理,即使我添加額外的Consumer<Throwable> errorHandler,它不會讓我例如,用其他查詢組成我的窗口小部件檢索,或者轉換結果。

因此,我正在尋找一些可組合Stream(迭代性,可轉換性)特徵與CompletableFuture(異步結果和錯誤處理)特徵相結合的組合物。 (而且,雖然我們在它,背壓可能會很好。)

這是一個java.util.concurrent.Flow.Publisher?一個io.reactivex.Observable?更復雜的東西?更簡單些?

+2

我傾向於將事情推入隊列,並將事情從隊列中抽出。 –

回答

6

您的用例非常自然地落入RxJava正在處理的世界中。如果我們有一個觀察的:

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

可觀察鏈將在backgroundScheduler運行:

Observable<Widget> getMatchingWidgets(wc); 

,基於標準的產生零個或多個窗口小部件,那麼你可以,因爲它似乎用處理每個插件,它通常是一個線程池執行器服務的包裝器。如果你需要做的每個插件的最終處理您的UI,你可以使用observeOn()運營商處理之前切換到UI調度:

getMatchingWidgets(wc) 
    .subscribeOn(backgroundScheduler) 
    .observeOn(uiScheduler) 
    .subscribe(w -> processWidget(w), 
       error -> handleError(error)); 

對我來說,RxJava方法的優雅是它以流暢的方式處理管道管理的諸多細節。看着那個觀察者鏈,你確切地知道發生了什麼以及在哪裏。