我試圖根據某些約束以快速方式刪除一批couchbase文檔(或者如果約束不滿足,則更新文檔)。根據我的術語,每個刪除都被稱爲「包裹」。正確終止卡住的Couchbase Observable
執行時,我遇到了一個非常奇怪的行爲 - 負責此任務的線程開始按照預期進行幾次迭代(充其量)。在此「寬限期」之後,couchbase會「卡住」,並且Observable
在定義的30秒時段內不會調用其任何Subscriber
的方法(onNext
,onComplete
,onError
)。
當latch
超時發生時(見下面的實現),該方法返回,但Observable
繼續執行(我注意到,當它停止打印調試消息停止時,此斷點之外的斷點)。 我懷疑沙發底座卡住了,因爲幾秒鐘後,許多Observable
被留在某種「鬼魂」狀態 - 活着並報告給他們的Subscriber
,而這反過來無關,因爲它們的創建方法已經完成,最終導致java.lang.OutOfMemoryError: GC overhead limit exceeded
。
我不知道我在這裏聲稱是否有道理,但我想不出這種行爲的另一個原因。 我應該如何正確終止超時時間的Observable?我是不是該?任何其他方式?
public List<InfoParcel> upsertParcels(final Collection<InfoParcel> parcels) {
final CountDownLatch latch = new CountDownLatch(parcels.size());
final List<JsonDocument> docRetList = new LinkedList<JsonDocument>();
Observable<JsonDocument> obs = Observable
.from(parcels)
.flatMap(parcel ->
Observable.defer(() ->
{
return bucket.async().get(parcel.key).firstOrDefault(null);
})
.map(doc -> {
// In-memory manipulation of the document
return updateDocs(doc, parcel);
})
.flatMap(doc -> {
boolean shouldDelete = ... // Decide by inner logic
if (shouldDelete) {
if (doc.cas() == 0) {
return Observable.just(doc);
}
return bucket.async().remove(doc);
}
return (doc.cas() == 0 ? bucket.async().insert(doc) : bucket.async().replace(doc));
})
);
obs.subscribe(new Subscriber<JsonDocument>() {
@Override
public void onNext(JsonDocument doc) {
docRetList.add(doc);
latch.countDown();
}
@Override
public void onCompleted() {
// Due to a bug in RxJava, onError()/retryWhen() does not intercept exceptions thrown from within the map/flatMap methods.
// Therefore, we need to recalculate the "conflicted" parcels and send them for update again.
while(latch.getCount() > 0) {
latch.countDown();
}
}
@Override
public void onError(Throwable e) {
// Same reason as above
while (latch.getCount() > 0) {
latch.countDown();
}
}
};
);
latch.await(30, TimeUnit.SECONDS);
// Recalculating remaining failed parcels and returning them for another cycle of this method (there's a loop outside)
}
我不能說Couchbase的一部分,但如果您認爲您遇到了RxJava中的缺陷或性能不足,請參閱RxJava問題列表併發佈一個小代碼snipplet,以幫助我們解決任何問題。 – akarnokd
10x用於響應。 幾個月前,當我發現這個缺陷時,我確實來到了問題列表,發現已經有一個問題打開了,還包括一個修復程序(以代碼段的形式)。我不知道這個修補程序是否已經過測試/發佈,但團隊已經知道它。 – KidCrippler