2013-12-18 46 views
9

我在探索響應式編程和RxJava。這很有趣,但我陷入了一個無法找到答案的問題。我的基本問題:什麼是反應適當的方式來終止一個無限運行的Observable?我也歡迎有關我的代碼的批評和反應性最佳實踐。RxJava - 終止無限流

作爲練習,我正在寫一個日誌文件尾部實用程序。日誌文件中的行流由Observable<String>表示。要使BufferedReader繼續讀取添加到文件中的文本,我忽略了通常的reader.readLine() == null終止檢查,而是將其解釋爲意味着我的線程應該休眠並等待更多記錄器文本。

但是,雖然我可以使用takeUntil終止觀察者,但我需要找到一種乾淨的方式來終止無限運行的文件觀察器。我可以編寫我自己的terminateWatcher方法/字段,但打破了Observable/Observer封裝 - 我想盡可能保持對被動範例的嚴格要求。

這裏是Observable<String>代碼:

public class FileWatcher implements OnSubscribeFunc<String> { 
    private Path path = . . .; 

    @Override 
    // The <? super String> generic is pointless but required by the compiler 
    public Subscription onSubscribe(Observer<? super String> observer) { 
     try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) { 
      String newLine = ""; 
      while (!Thread.interrupted()) { // How do I terminate this reactively? 
       if ((newLine = reader.readLine()) != null) 
        observer.onNext(newLine); 
       else 
        try { 
         // Wait for more text 
         Thread.sleep(250); 
        } catch (InterruptedException e) { 
         Thread.currentThread().interrupt(); 
        } 
      } 
      observer.onCompleted(); 
     } catch (Exception e) { 
      observer.onError(e); 
     } 

     return null; // Not sure what Subscription I should return 
    } 
} 

這裏是觀測代碼打印新的生產線,因爲他們來:

public static void main(String... args) { 
    . . . 
    Observable<String> lines = Observable.create(createWatcher(file)); 
    lines = lines.takeWhile(new Func1<String, Boolean>() { 
     @Override 
     public Boolean call(String line) { 
      // Predicate for which to continue processing 
      return !line.contains("shutdown"); 
     } 
    }).subscribeOn(Schedulers.threadPoolForIO()) 
      .observeOn(Schedulers.currentThread()); 
    // Seems like I should use subscribeOn() and observeOn(), but they 
    // make my tailer terminate without reading any text. 

    Subscription subscription = lines.subscribe(new Action1<String>() { 
     @Override 
     public void call(String line) { 
      System.out.printf("%20s\t%s\n", file, line); 
     } 
    }); 
} 

我的兩個問題是:

  1. 什麼是一種反應一致的方式來終止一個否則無限運行的流?
  2. 我的代碼中還有哪些錯誤會讓你哭泣? :)

回答

6

由於您是在請求訂閱時啓動文件,所以在訂閱結束時(即Subscription關閉時)終止它是有意義的。這解決了您的代碼註釋中一個鬆散的結尾:您返回一個Subscription告訴FileWatcher停止觀看該文件。然後替換您的循環條件,以便它檢查訂閱是否已被取消,而不是檢查當前線程是否已被中斷。

問題是,如果您的onSubscribe()方法永不返回,那不是非常有用。也許你的FileWatcher應該要求指定一個調度程序,讀取發生在該調度程序上。

+0

因此,調度程序可以定義應該讀取日誌文件的頻率;那麼當調度程序運行FileWatcher時,FileWatcher會拉出行直到readLine()== null。然後觀察者可以像正常一樣取消訂閱,或使用'takeUntil()'自動取消訂閱。我將不得不研究這些方法,看看它是否有效,但我喜歡這個想法。 – MonkeyWithDarts