我試圖讓一個非常基本的基於RxJava的應用程序工作。我已經定義了以下可觀察類讀取和文件返回行:RxJava和rx.exceptions.MissingBackpressureException異常
public Observable<String> getObservable() throws IOException
{
return Observable.create(subscribe -> {
InputStream in = getClass().getResourceAsStream("/trial.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = null;
try {
while((line = reader.readLine()) != null)
{
subscribe.onNext(line);
}
} catch (IOException e) {
subscribe.onError(e);
}
finally {
subscribe.onCompleted();
}
});
}
接下來,我已經定義了subscrober代碼:
public static void main(String[] args) throws IOException, InterruptedException {
Thread thread = new Thread(() ->
{
RxObserver observer = new RxObserver();
try {
observer.getObservable()
.observeOn(Schedulers.io())
.subscribe(x ->System.out.println(x),
t -> System.out.println(t),
() -> System.out.println("Completed"));
} catch (IOException e) {
e.printStackTrace();
}
});
thread.start();
thread.join();
}
文件已接近50000分的記錄。運行應用程序時,我得到「rx.exceptions.MissingBackpressureException」。我已經通過了一些文檔,並按照建議,我嘗試在調用鏈中添加「.onBackpressureBuffer()」方法。但是,我沒有得到例外,但完成的呼叫也不會被解僱。
什麼是正確的方式來處理情況,我們有一個快速生產Observable?
SyncOnSubscribe是最新的RxJava包的一部分嗎?我有RxJava框架的1.0.8版本? –
明白了。包含最新版本1.1.0,並且包含SyncOnSubscribe類。 –