2015-04-02 40 views
0

我正在使用新的Couchbase Java Client API 2.1.1,因此JavaRx訪問我的Couchbase集羣。
在已鎖定的文檔上使用異步getAndLock時,getAndLock將失敗,並顯示TemporaryLockFailureException。在另一個SO問題(rxjava: Can I use retry() but with delay?)中,我發現瞭如何延遲重試。正確的方式重試延遲Couchbase getAndLock如果鎖已被佔用?

這是我通過代碼:

CountDownLatchWithResultData<JsonDocument> resultCdl = new CountDownLatchWithResultData<>(1); 

    couchbaseBucket.async().getAndLock(key, LOCK_TIME).retryWhen((errorObserver) -> { 
     return errorObserver.flatMap((Throwable t) -> { 
      if (t instanceof TemporaryLockFailureException) { 
       return Observable.timer(RETRY_DELAY_MS, TimeUnit.MILLISECONDS); 
      } 
      return Observable.error(t); 
     }); 
    }).subscribe(new Subscriber<JsonDocument>() { 

     @Override 
     public void onCompleted() { 
      resultCdl.countDown(); 
     } 

     @Override 
     public void onError(Throwable e) { 
      resultCdl.countDown(); 
     } 

     @Override 
     public void onNext(JsonDocument t) { 
      resultCdl.setInformation(t); 
     } 

    }); 

    ........ 

    resultCdl.await(); 

    if (resultCdl.getInformation() == null) { 
     //do stuff 
    } else .... 

CountDownLatchWithResultData簡單地延長正常CountDownLatch,並增加了兩種方法來存儲一些信息的計數已達到0之前和之後檢索)

所以基本上我'd like this code to

  • 嘗試無限次獲得鎖定每RETRY_DELAY_MS毫秒如果TemporaryLockFailureException發生,然後調用onNext
  • 或如果在所有

的問題也不例外,現在是時候重新嘗試,僅僅重試一次,在其他異常

  • 或直接致電onNext完全失敗即使文檔存在,在這種情況下,resultCdl.getInformation()的JsonDocument始終爲空。看起來onNext從不叫。
    如果沒有異常,代碼工作正常。
    顯然,我在這裏做錯了事,但我不知道問題出在哪裏。是否返回Observable.timer意味着用這個新的Obervable也會重新執行先前關聯的retryWhen?計數爲1的是CountDownLatch

  • +0

    我嘗試在沒有couchbase的情況下模擬您的問題,並且只使用發出值或錯誤的簡單Observable。我沒有重現它。你有沒有把一個斷點放入onNext()來檢查它是否被調用? – dwursteisen 2015-04-02 14:46:57

    回答

    0

    這是一個微妙的。截至版本2.2.0,來自SDK的Observable都處於「熱門」類別。實際上這意味着即使沒有訂閱,他們也開始發佈。他們也會將相同的數據發送到每個新手Subscriber,所以實際上他們會緩存數據。

    因此,您重試的是重新訂閱Observable,它將始終發出相同的東西(在本例中爲錯誤)。我懷疑它散發出來的重試循環,只是因爲鎖最大持續時間爲LOCK_TIME的...

    嘗試包裹的Observable.defer內調用asyncBucket.getAndLock(或遷移到2.2.x的SDK,如果這件事情你可以做,看到release and migration notes starting from 2.2.0)。