2016-02-22 59 views
1

我想在RxJava中實現一個處理隊列來下載一些文件。我要下載文件的數量可以達到100左右爲什麼在使用backpreasurebuffer時會遇到MissingBackpreasureException?

一切都在Android上使用RxJava 1.1.1開發

我目前的實施看起來是這樣的:

PublishSubject<URL> publishSubject = PublishSubject.create(); 
_subject = new SerializedSubject<>(publishSubject); 

_subscription = _subject 
       .subscribeOn(Schedulers.io()) 
       .observeOn(Schedulers.io()) 
       .subscribe(_getObserver()); // Observer 

_subject.onBackpressureBuffer(10000, new Action0() { 
    @Override 
    public void call() { 
     Log.d(TAG, "onBackpressureBuffer called"); 
     return; 
    } 
}); 

// download a file 
_subject.onNext(aValidURL); 

_getObserver()返回一個新的觀察對象,該對象在「onNext」方法中下載文件。

但是,我的問題是,我很快得到了MissingBackpreasureException,我不明白。我試圖實施backpreasurebuffer,但它似乎並沒有被調用。

我在做什麼錯?

+0

按照問題中的鏈接https://github.com/ReactiveX/RxJava/issues/2453 – CandleCoder

+0

@CandleCoder我不明白。 Backpreasure在我的代碼的其他領域工作。問題只出現在這個特定的情況下,我懷疑它與我使用PublishSubject的事實有關 - 但這只是一個瘋狂的猜測。就我所知,鏈接中的問題非常普遍,甚至不是真正的問題。 –

+0

我的理解是,考慮到他們擁有的外部驅動行爲,主體固有地不支持背壓。也許你對onBackPressureBuffer調用感興趣? –

回答

2

在RxJava中,當您應用運算符時,會得到一個新的Observable實例,其修改後的行爲卻保持不變。在這裏,你在_subject上調用onBackpressureBuffer,但是不要使用它的結果,否則這個調用什麼都不做。你需要應用順序:

PublishSubject<URL> publishSubject = PublishSubject.create(); 
_subject = new SerializedSubject<>(publishSubject); 

_subscription = _subject 
       .onBackpressureBuffer(10000, new Action0() { 
        @Override 
        public void call() { 
         Log.d(TAG, "onBackpressureBuffer called"); 
         return; 
        } 
       }) 
       .subscribeOn(Schedulers.io()) 
       .observeOn(Schedulers.io()) 
       .subscribe(_getObserver()); // Observer 

// download a file 
_subject.onNext(aValidURL); 
+0

我(強烈的)認爲我在某個時候測試過了。但是,現在我遇到了一些問題,用你的方法來重現我的例外。當我做了更多的測試後,我會盡快回復你。到目前爲止,謝謝。 –

相關問題