2016-01-02 42 views
2

我試圖讓一個非常基本的基於RxJava的應用程序工作。我已經定義了以下可觀察類讀取和文件返回行:RxJava和rx.exceptions.MissingBackpressureException異常

public Observable<String> getObservable() throws IOException 
    { 
     return Observable.create(subscribe -> { 
      InputStream in = getClass().getResourceAsStream("/trial.txt"); 
      BufferedReader reader = new BufferedReader(new InputStreamReader(in)); 
      String line = null; 
      try { 
       while((line = reader.readLine()) != null) 
       { 
        subscribe.onNext(line); 
       } 
      } catch (IOException e) { 
       subscribe.onError(e); 
      } 
      finally { 
       subscribe.onCompleted(); 
      } 
     }); 
    } 

接下來,我已經定義了subscrober代碼:

public static void main(String[] args) throws IOException, InterruptedException { 
     Thread thread = new Thread(() -> 
     { 
      RxObserver observer = new RxObserver(); 
      try { 
       observer.getObservable() 
         .observeOn(Schedulers.io()) 
         .subscribe(x ->System.out.println(x), 
            t -> System.out.println(t), 
            () -> System.out.println("Completed")); 

      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     }); 

     thread.start(); 
     thread.join(); 
    } 

文件已接近50000分的記錄。運行應用程序時,我得到「rx.exceptions.MissingBackpressureException」。我已經通過了一些文檔,並按照建議,我嘗試在調用鏈中添加「.onBackpressureBuffer()」方法。但是,我沒有得到例外,但完成的呼叫也不會被解僱。

什麼是正確的方式來處理情況,我們有一個快速生產Observable?

回答

2

的第一個問題是,你的readLine邏輯忽略背壓。您可以申請onBackpressureBuffer()只是observeOn之前下手,但有一個最近除了SyncOnSubscribe,它可以讓你產生價值逐一採取背壓的護理:

SyncOnSubscribe.createSingleState(() => { 
    try { 
     InputStream in = getClass().getResourceAsStream("/trial.txt"); 
     return new BufferedReader(new InputStreamReader(in)); 
    } catch (IOException ex) { 
     throw new RuntimeException(ex); 
    } 
}, 
(s, o) -> { 
    try { 
     String line = s.readLine(); 
     if (line == null) { 
      o.onCompleted(); 
     } else { 
      o.onNext(line); 
     } 
    } catch (IOException ex) { 
     s.onError(ex); 
    } 
}, 
s -> { 
    try { 
     s.close(); 
    } catch (IOException ex) { 
    } 
}); 

的第二個問題是,你的線程將完成前路一切io線程上的元素已經交付,因此主程序退出。刪除observeOn,添加.toBlocking或使用CountDownLatch

RxObserver observer = new RxObserver(); 
try { 

    CountDownLatch cdl = new CountDownLatch(1); 

    observer.getObservable() 
      .observeOn(Schedulers.io()) 
      .subscribe(x ->System.out.println(x), 
         t -> { System.out.println(t); cdl.countDown(); }, 
         () -> { System.out.println("Completed"); cdl.countDown(); }); 

    cdl.await(); 
} catch (IOException | InterruptedException e) { 
    e.printStackTrace(); 
} 
+0

SyncOnSubscribe是最新的RxJava包的一部分嗎?我有RxJava框架的1.0.8版本? –

+0

明白了。包含最新版本1.1.0,並且包含SyncOnSubscribe類。 –

1

這裏的問題是observeOn運營商,因爲每一個觀看的onNext()調用被安排在一個單獨的線程中調用,您可觀察到持續不論用戶(observeOn)容量產生在一個循環的排定呼叫。

如果保持同步,Observable將不會發出下一個元素,直到用戶完成前一個元素爲止,因爲它全部在一個線程上完成,並且不會再有背壓問題。

如果你仍然想使用observeOn,你將必須實現背壓邏輯在可觀測的OnSubscribe#調用方法