我想,我得從觀測量+ Couchbase異步API腦爆炸:) 可能有人幫助我,好嗎? 已經和批量操作打了幾天,並且仍然無法理解如何通過適當的錯誤處理來完成批量操作。Couchbase異步批錯誤處理
比方說,我要更新Couchbase散裝一些文件。 如果我使用同步API,它看起來像:
List< JsonDocument> items = getItems(1, 2, 3, 4, 5);
// getItems - some method which calls bucket.get() for specified keys
for (JsonDocument item : items) {
try {
try {
item.content().put("age", 42);
bucket.replace(item);
} catch (CASMismatchException e) {
// retry
bucket.get(item.id()).content().put("age", 42);
bucket.replace(item);
}
} catch (Exception e) {
// handle error which doesn't stop execution for other items
// for example, add item id to list of failed items in response
errorHandler.handleError(item.id(), e);
}
}
但這不是平行的,和文檔說異步API更有效。 我不能理解的是,如何通過建立這樣的觀測量流量,我想:
Observable.from(items)
.flatMap(item -> {
item.content().put("age", 42);
return bucket.async().replace(item);
})
.onErrorResumeNext(error -> {
// what to do? return another observable which does retry logic above?
// how do I know what item has failed?
// I don't have ID of that item, nor I can extract it from passed Exception
// why onErrorResumeNext is getting called only once (if one item fails)
// and is not called for other items?
})
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log)
任何幫助將非常感激! 感謝
我想你最好需要通過Observable.create建立可觀察與嘗試捕捉然後直接重試,如果重試工作發出這個項目,如果沒有則發出錯誤。 –