1
我正在用Socket.io來實現RxJava。我有一個問題再次訂閱相同的observable,所以我做了一個例子和一些清晰的代碼來理解。RxJava在訂閱後繼續發射
這裏是功能:
private void testObservable() throws InterruptedException {
io.reactivex.Observable observable = io.reactivex.Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
for (int i = 0; i < 20; i++) {
java.lang.Thread.sleep(100);
e.onNext(i);
Timber.d("TESTOBSERVER emit %d", i);
}
e.onComplete();
}
});
DisposableObserver observer = new DisposableObserver<Integer>() {
@Override
public void onNext(Integer value) {
Timber.d("TESTOBSERVER observed: %d", value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(observer);
Timber.d("TESTOBSERVER subscribe 1");
java.lang.Thread.sleep(500);
observer.dispose();
Timber.d("TESTOBSERVER dispose 1");
java.lang.Thread.sleep(500);
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(observer);
Timber.d("TESTOBSERVER subscribe 2");
java.lang.Thread.sleep(500);
observer.dispose();
Timber.d("TESTOBSERVER dispose 2");
}
,這裏是它所打印的
06-07 15:02:49.851 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER subscribe 1
06-07 15:02:49.951 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 0
06-07 15:02:50.051 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 1
06-07 15:02:50.161 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 2
06-07 15:02:50.261 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 3
06-07 15:02:50.351 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER dispose 1
06-07 15:02:50.861 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER subscribe 2
06-07 15:02:51.361 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER dispose 2
我有兩個問題:
- 爲什麼不能打印出
TESTOBSERVER observed: 1
? - 爲什麼在第二次訂閱後,它不會繼續發射9,10,11,12,13?
我承諾第二回答,但是請您詳細說明一日。 –
像「AndroidSchedulers.mainThread()」這樣的調度程序可以在該線程完成當前正在執行的任務時工作。如果您在該線程上調用'testObservable()'並使用'Thread.sleep'來阻塞,那麼在調用'Observer.dispose()'之後,該線程永遠無法用於這些預定操作。在那時它永遠不會調用onNext – Kiskae