2017-04-12 89 views
0

我想,我得從觀測量+ Couchbase異步API腦爆炸:) 可能有人幫助我,好嗎? 已經和批量操作打了幾天,並且仍然無法理解如何通過適當的錯誤處理來完成批量操作。Couchbase異步批錯誤處理

比方說,我要更新Couchbase散裝一些文件。 如果我使用同步API,它看起來像:

List< JsonDocument> items = getItems(1, 2, 3, 4, 5); 
// getItems - some method which calls bucket.get() for specified keys 

for (JsonDocument item : items) { 
    try { 
     try { 
     item.content().put("age", 42); 
     bucket.replace(item); 
     } catch (CASMismatchException e) { 
     // retry 
     bucket.get(item.id()).content().put("age", 42); 
     bucket.replace(item); 
     } 
    } catch (Exception e) { 
     // handle error which doesn't stop execution for other items 
     // for example, add item id to list of failed items in response 
     errorHandler.handleError(item.id(), e); 
    } 
} 

但這不是平行的,和文檔說異步API更有效。 我不能理解的是,如何通過建立這樣的觀測量流量,我想:

Observable.from(items) 
.flatMap(item -> { 
    item.content().put("age", 42); 
    return bucket.async().replace(item); 
}) 
.onErrorResumeNext(error -> { 
    // what to do? return another observable which does retry logic above? 
    // how do I know what item has failed? 
    // I don't have ID of that item, nor I can extract it from passed Exception 
    // why onErrorResumeNext is getting called only once (if one item fails) 
    // and is not called for other items? 
}) 
.subscribe(); // also need Subscriber with onError (otherwise there are warnings in log) 

任何幫助將非常感激! 感謝

+1

我想你最好需要通過Observable.create建立可觀察與嘗試捕捉然後直接重試,如果重試工作發出這個項目,如果沒有則發出錯誤。 –

回答

0

你可以做這樣的事情:

Observable.from(items) 
      .flatMap(item -> { 
       item.content().put("age", 42); 
       return bucket.async() 
         .replace(item) 
         .retry((count, throwable) -> count == 1 && throwable instanceof CASMismatchException) 
         .onErrorReturn(e -> { 
          errorHandler.handleError(item.id(), e); 
          return null; //or item, if you need the item further down the stream 
         }) 
         .subscribeOn(Schedulers.io()); //not sure if it's needed with bucket.async() 
      }) 
      .subscribeOn(<something>) //with this scheduler the put() method will be executed 
      .subscribe(); 

的想法是通過flatMap()每個項目處理到一個單獨的可觀察分開,因爲每個重試邏輯是單個項目,而不是整個流。 重試運營謂語,讓您重試次數和異常,所以你的情況,我們只與特定CASMismatchException例外重試的第一次,然後錯誤,我們可以簡單地做onErrorReturn和辦理其他錯誤操作,你甚至可以返回該項目如果你想繼續處理它。
有一點需要注意的是調度,我不知道如果Couchbase在默認情況下io()async()通話時操作。同時,考慮到該行:

item.content().put("age", 42); 

將在最後subscribeOn(),因爲它會在主流訂閱調度來完成執行。

+0

感謝您的幫助!我會盡量遵循你的建議。我描述的情況非常簡單,但是關於通過Observable項而不是列出Observable捕獲錯誤的想法是我真正想到的,現在它很可能會解決我的問題。 – blackdigger