2
我一直在努力通過Reactive Programming with RxJava這本書的例子,該書的目標不是版本1而是2.對無限流的介紹有以下示例(並指出有更好的方法來處理併發):RxJava 2相當於isUnsubscribed
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
Runnabler =() -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});
...
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
然而,在RxJava 2,傳遞給create()
方法lambda表達式是ObservableEmitter
類型的,並且這不具有isUnsubscribed()
方法。我看了一下What's Different in 2.0,同時也搜索了版本庫,但是找不到任何這樣的方法。
如何在2.0中實現相同的功能?
爲如下(注意,使用科特林)編輯,包括解決方案:
val naturalNumbers = Observable.create<BigInteger> { emitter ->
Thread({
var int: BigInteger = BigInteger.ZERO
while (!emitter.isDisposed) {
emitter.onNext(int)
int = int.add(BigInteger.ONE)
}
}).start()
}
val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }
Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()
太棒了,那就是訣竅。謝謝。 – amb85