2015-12-27 25 views
3

我試圖根據某些約束以快速方式刪除一批couchbase文檔(或者如果約束不滿足,則更新文檔)。根據我的術語,每個刪除都被稱爲「包裹」。正確終止卡住的Couchbase Observable

執行時,我遇到了一個非常奇怪的行爲 - 負責此任務的線程開始按照預期進行幾次迭代(充其量)。在此「寬限期」之後,couchbase會「卡住」,並且Observable在定義的30秒時段內不會調用其任何Subscriber的方法(onNextonCompleteonError)。

latch超時發生時(見下面的實現),該方法返回,但Observable繼續執行(我注意到,當它停止打印調試消息停止時,此斷點之外的斷點)。 我懷疑沙發底座卡住了,因爲幾秒鐘後,許多Observable被留在某種「鬼魂」狀態 - 活着並報告給他們的Subscriber,而這反過來無關,因爲它們的創建方法已經完成,最終導致java.lang.OutOfMemoryError: GC overhead limit exceeded

我不知道我在這裏聲稱是否有道理,但我想不出這種行爲的另一個原因。 我應該如何正確終止超時時間的Observable?我是不是該?任何其他方式?

public List<InfoParcel> upsertParcels(final Collection<InfoParcel> parcels) { 
    final CountDownLatch latch = new CountDownLatch(parcels.size()); 

    final List<JsonDocument> docRetList = new LinkedList<JsonDocument>(); 
    Observable<JsonDocument> obs = Observable 
      .from(parcels) 
      .flatMap(parcel -> 
         Observable.defer(() -> 
          { 
           return bucket.async().get(parcel.key).firstOrDefault(null); 
          }) 
          .map(doc -> { 
           // In-memory manipulation of the document 
           return updateDocs(doc, parcel); 
          }) 
          .flatMap(doc -> { 
           boolean shouldDelete = ... // Decide by inner logic 
           if (shouldDelete) { 
            if (doc.cas() == 0) { 
             return Observable.just(doc); 
            } 
            return bucket.async().remove(doc); 
           } 
           return (doc.cas() == 0 ? bucket.async().insert(doc) : bucket.async().replace(doc)); 
          }) 
      ); 

    obs.subscribe(new Subscriber<JsonDocument>() { 
       @Override 
       public void onNext(JsonDocument doc) { 
        docRetList.add(doc); 
        latch.countDown(); 
       } 

       @Override 
       public void onCompleted() { 
        // Due to a bug in RxJava, onError()/retryWhen() does not intercept exceptions thrown from within the map/flatMap methods. 
        // Therefore, we need to recalculate the "conflicted" parcels and send them for update again. 
        while(latch.getCount() > 0) { 
         latch.countDown(); 
        } 
       } 

       @Override 
       public void onError(Throwable e) { 
        // Same reason as above 
        while (latch.getCount() > 0) { 
         latch.countDown(); 
        } 
       } 
      }; 
    ); 

    latch.await(30, TimeUnit.SECONDS); 

    // Recalculating remaining failed parcels and returning them for another cycle of this method (there's a loop outside) 
} 
+0

我不能說Couchbase的一部分,但如果您認爲您遇到了RxJava中的缺陷或性能不足,請參閱RxJava問題列表併發佈一個小代碼snipplet,以幫助我們解決任何問題。 – akarnokd

+0

10x用於響應。 幾個月前,當我發現這個缺陷時,我確實來到了問題列表,發現已經有一個問題打開了,還包括一個修復程序(以代碼段的形式)。我不知道這個修補程序是否已經過測試/發佈,但團隊已經知道它。 – KidCrippler

回答

0

我認爲這確實是由於這樣的事實,使用倒計時閂沒有信號的數據處理流程應停止源。

您可以使用更多的rxjava,通過使用toList().timeout(30, TimeUnit.SECONDS).toBlocking().single()而不是收集(未同步且因此不安全)的外部列表以及使用倒數鎖定。

這將阻塞,直到您的文件列表返回。

+0

之前我們使用過這種方法,因爲與BlockingObservable相關的性能問題,我們必須將所有代碼轉換爲基於鎖存的代碼。我非常確定我在coucuhbase文檔中讀到了BlockingObservables不推薦用於生產環境。 順便說一下,我的列表是本地每個線程,爲什麼它不安全? – KidCrippler

+1

它可能是不安全的,因爲添加到列表是異步完成的,最有可能來自另一個線程。如果你想保持鎖定狀態,你是否試圖保持訂閱(訂閱)的結果並在鎖存超時時調用取消訂閱? –

+0

這實際上是我嘗試過的一個好主意。我同步訪問列表(在onNext()方法中)並在鎖定超時後取消訂閱。不幸的是,我仍然遇到同樣的問題。 – KidCrippler