我正在嘗試使用RxJava編寫一個簡單的程序來生成無限序列的自然數。所以,我已經找到了兩種使用Observable.timer()和Observable.interval()生成數字序列的方法。我不確定這些功能是否是解決這個問題的正確方法。我期待着一個簡單的函數,就像我們在Java 8中生成的函數一樣,可以生成無限自然數。使用RxJava生成無限序列的自然數
IntStream.iterate(1,value - > value +1).forEach(System.out :: println);
我嘗試使用IntStream與Observable但不能正常工作。它僅向第一個用戶發送無限數量的數字流。我怎樣才能正確生成無限自然數序列?
import rx.Observable;
import rx.functions.Action1;
import java.util.stream.IntStream;
public class NaturalNumbers {
public static void main(String[] args) {
Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
IntStream stream = IntStream.iterate(1, val -> val + 1);
stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber));
});
Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber);
Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber);
Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber);
naturalNumbers.subscribe(first);
naturalNumbers.subscribe(second);
naturalNumbers.subscribe(third);
}
}
感謝邁克你的答案。如果我在創建Observable的時候調用subscribeOn方法,而不是像上面的代碼片段所示的那樣調用它三次,它會有什麼不同。我測試了它,行爲相同但仍想確認。 – Shekhar
這個問題被正確識別,但這是不好的建議 - 你不應該使用'subscribeOn'解決這個問題 - 請參閱我的答案爲什麼。 –
以這種方式調用'unsubscribe'會斷開用戶連接,因此它會停止接收消息,但它不會停止發生器的循環,這會持續無限地佔用CPU的能量。請參閱我的回答,瞭解如何解決故事的兩個方面。 –