我想在RxJava中實現一個處理隊列來下載一些文件。我要下載文件的數量可以達到100左右爲什麼在使用backpreasurebuffer時會遇到MissingBackpreasureException?
一切都在Android上使用RxJava 1.1.1開發
我目前的實施看起來是這樣的:
PublishSubject<URL> publishSubject = PublishSubject.create();
_subject = new SerializedSubject<>(publishSubject);
_subscription = _subject
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(_getObserver()); // Observer
_subject.onBackpressureBuffer(10000, new Action0() {
@Override
public void call() {
Log.d(TAG, "onBackpressureBuffer called");
return;
}
});
// download a file
_subject.onNext(aValidURL);
凡_getObserver()
返回一個新的觀察對象,該對象在「onNext」方法中下載文件。
但是,我的問題是,我很快得到了MissingBackpreasureException
,我不明白。我試圖實施backpreasurebuffer
,但它似乎並沒有被調用。
我在做什麼錯?
按照問題中的鏈接https://github.com/ReactiveX/RxJava/issues/2453 – CandleCoder
@CandleCoder我不明白。 Backpreasure在我的代碼的其他領域工作。問題只出現在這個特定的情況下,我懷疑它與我使用PublishSubject的事實有關 - 但這只是一個瘋狂的猜測。就我所知,鏈接中的問題非常普遍,甚至不是真正的問題。 –
我的理解是,考慮到他們擁有的外部驅動行爲,主體固有地不支持背壓。也許你對onBackPressureBuffer調用感興趣? –