2017-06-13 59 views
2

我一直在努力通過Reactive Programming with RxJava這本書的例子,該書的目標不是版本1而是2.對無限流的介紹有以下示例(並指出有更好的方法來處理併發):RxJava 2相當於isUnsubscribed

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> { 
    Runnabler =() -> { 
     BigInteger i = ZERO; 
     while (!subscriber.isUnsubscribed()) { 
      subscriber.onNext(i); 
      i = i.add(ONE); 
     } 
    }; 
    new Thread(r).start(); 
}); 

... 

Subscription subscription = naturalNumbers.subscribe(x -> log(x)); 
/* after some time... */ 
subscription.unsubscribe(); 

然而,在RxJava 2,傳遞給create()方法lambda表達式是ObservableEmitter類型的,並且這不具有isUnsubscribed()方法。我看了一下What's Different in 2.0,同時也搜索了版本庫,但是找不到任何這樣的方法。

如何在2.0中實現相同的功能?

爲如下(注意,使用科特林)編輯,包括解決方案:

val naturalNumbers = Observable.create<BigInteger> { emitter -> 
    Thread({ 
     var int: BigInteger = BigInteger.ZERO 
     while (!emitter.isDisposed) { 
      emitter.onNext(int) 
      int = int.add(BigInteger.ONE) 
     } 
    }).start() 
} 

val first = naturalNumbers.subscribe { log("First: $it") } 
val second = naturalNumbers.subscribe { log("Second: $it") } 

Thread.sleep(5) 
first.dispose() 
Thread.sleep(5) 
second.dispose() 

回答

2

後您訂閱觀測到的,則返回Disposable。您可以將它保存到您的本地變量並檢查disposable.isDisposed()以查看它是否仍訂閱。

+1

太棒了,那就是訣竅。謝謝。 – amb85