2017-06-07 116 views
1

我正在用Socket.io來實現RxJava。我有一個問題再次訂閱相同的observable,所以我做了一個例子和一些清晰的代碼來理解。RxJava在訂閱後繼續發射

這裏是功能:

private void testObservable() throws InterruptedException { 
    io.reactivex.Observable observable = io.reactivex.Observable.create(new ObservableOnSubscribe() { 
     @Override 
     public void subscribe(ObservableEmitter e) throws Exception { 
      for (int i = 0; i < 20; i++) { 
       java.lang.Thread.sleep(100); 
       e.onNext(i); 
       Timber.d("TESTOBSERVER emit %d", i); 
      } 
      e.onComplete(); 
     } 
    }); 
    DisposableObserver observer = new DisposableObserver<Integer>() { 
     @Override 
     public void onNext(Integer value) { 
      Timber.d("TESTOBSERVER observed: %d", value); 
     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onComplete() { 

     } 
    }; 
    observable.subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribeWith(observer); 
    Timber.d("TESTOBSERVER subscribe 1"); 

    java.lang.Thread.sleep(500); 
    observer.dispose(); 

    Timber.d("TESTOBSERVER dispose 1"); 

    java.lang.Thread.sleep(500); 
    observable.subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribeWith(observer); 
    Timber.d("TESTOBSERVER subscribe 2"); 
    java.lang.Thread.sleep(500); 
    observer.dispose(); 
    Timber.d("TESTOBSERVER dispose 2"); 
} 

,這裏是它所打印的

06-07 15:02:49.851 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER subscribe 1 
06-07 15:02:49.951 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 0 
06-07 15:02:50.051 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 1 
06-07 15:02:50.161 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 2 
06-07 15:02:50.261 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 3 
06-07 15:02:50.351 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER dispose 1 
06-07 15:02:50.861 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER subscribe 2 
06-07 15:02:51.361 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER dispose 2 

我有兩個問題:

  1. 爲什麼不能打印出TESTOBSERVER observed: 1
  2. 爲什麼在第二次訂閱後,它不會繼續發射9,10,11,12,13?

回答

2
  1. 爲什麼不能打印出TESTOBSERVER觀察:1?

推測是因爲testObservable()正在android主線程上執行。所以在取消之前從來沒有機會使用AndroidSchedulers.mainThread()來觀察值。

  1. 爲什麼在第二次訂閱後它不會繼續發射9,10,11,12,13?

因爲觀察對象不是有狀態的。每個.subscribe()調用都是獨立的。 它不發射任何東西的原因是因爲Observer不能被重複使用。所以既然它已經被認購和處置了,它就不會訂閱這個可觀察的。

+0

我承諾第二回答,但是請您詳細說明一日。 –

+0

像「AndroidSchedulers.mainThread()」這樣的調度程序可以在該線程完成當前正在執行的任務時工作。如果您在該線程上調用'testObservable()'並使用'Thread.sleep'來阻塞,那麼在調用'Observer.dispose()'之後,該線程永遠無法用於這些預定操作。在那時它永遠不會調用onNext – Kiskae