我在探索響應式編程和RxJava。這很有趣,但我陷入了一個無法找到答案的問題。我的基本問題:什麼是反應適當的方式來終止一個無限運行的Observable?我也歡迎有關我的代碼的批評和反應性最佳實踐。RxJava - 終止無限流
作爲練習,我正在寫一個日誌文件尾部實用程序。日誌文件中的行流由Observable<String>
表示。要使BufferedReader
繼續讀取添加到文件中的文本,我忽略了通常的reader.readLine() == null
終止檢查,而是將其解釋爲意味着我的線程應該休眠並等待更多記錄器文本。
但是,雖然我可以使用takeUntil
終止觀察者,但我需要找到一種乾淨的方式來終止無限運行的文件觀察器。我可以編寫我自己的terminateWatcher
方法/字段,但打破了Observable/Observer封裝 - 我想盡可能保持對被動範例的嚴格要求。
這裏是Observable<String>
代碼:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
這裏是觀測代碼打印新的生產線,因爲他們來:
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}
我的兩個問題是:
- 什麼是一種反應一致的方式來終止一個否則無限運行的流?
- 我的代碼中還有哪些錯誤會讓你哭泣? :)
因此,調度程序可以定義應該讀取日誌文件的頻率;那麼當調度程序運行FileWatcher時,FileWatcher會拉出行直到readLine()== null。然後觀察者可以像正常一樣取消訂閱,或使用'takeUntil()'自動取消訂閱。我將不得不研究這些方法,看看它是否有效,但我喜歡這個想法。 – MonkeyWithDarts