說我有一個API,基於一些查詢條件,會發現或構建一個小窗口:異步,在Java矢量/流數據可組合的返回值9
Widget getMatchingWidget(WidgetCriteria c) throws Throwable
的(同步)的客戶端代碼的外觀像:
try {
Widget w = getMatchingWidget(criteria);
processWidget(w);
} catch (Throwable t) {
handleError(t);
}
現在說找到或構建一個小部件是不可預知的昂貴,我不希望客戶端在等待它時阻塞。所以我將其更改爲:
CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)
客戶可以再一次將:
CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })
或:
getMatchingWidget(criteria).whenComplete((t, w) -> {
if (t != null) { handleError(t); }
else { processWidget(t); }
});
現在,讓我們說,而不是同步API可以返回0到n小部件:
Stream<Widget> getMatchingWidgets(WidgetCriteria c)
天真,我可以這樣寫:
CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)
然而,這並不能使代碼非阻塞的,它只是推動周圍堵塞 - 無論是Future
塊,直到所有的Widgets
可用,或代碼遍歷Stream
塊,等待每個Widget
。我要的是什麼,讓我處理每個插件到達時,是這樣的:
void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)
但是,這不提供錯誤處理,即使我添加額外的Consumer<Throwable> errorHandler
,它不會讓我例如,用其他查詢組成我的窗口小部件檢索,或者轉換結果。
因此,我正在尋找一些可組合Stream
(迭代性,可轉換性)特徵與CompletableFuture
(異步結果和錯誤處理)特徵相結合的組合物。 (而且,雖然我們在它,背壓可能會很好。)
這是一個java.util.concurrent.Flow.Publisher?一個io.reactivex.Observable?更復雜的東西?更簡單些?
我傾向於將事情推入隊列,並將事情從隊列中抽出。 –