2017-04-16 84 views
5

我有以下代碼:RxJava:觀察到的,默認的線程

Observable.create(new ObservableOnSubscribe<String>() { 
      @Override 
      public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception { 
       Thread thread = new Thread(new Runnable() { 
        @Override 
        public void run() { 
         s.onNext("1"); 
         s.onComplete(); 
        } 
       }); 
       thread.setName("background-thread-1"); 
       thread.start(); 
      } 
     }).map(new Function<String, String>() { 
      @Override 
      public String apply(@NonNull String s) throws Exception { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("map: thread=" + threadName); 
       return "map-" + s; 
      } 
     }).subscribe(new Observer<String>() { 
      @Override 
      public void onSubscribe(Disposable d) {} 

      @Override 
      public void onNext(String s) { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("onNext: thread=" + threadName + ", value=" + s); 
      } 

      @Override 
      public void onError(Throwable e) {} 

      @Override 
      public void onComplete() { 
       String threadName = Thread.currentThread().getName(); 
       logger.logDebug("onComplete: thread=" + threadName); 
      } 
     }); 

而這裏的輸出:

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1 

重要的細節:我打電話從另一個線程subscribe方法(main線程在Android中)。

所以看起來像Observable類是同步的,在默認情況下,它發出事件(s.onNext)在同一個線程執行的一切(運營商如map +通知用戶),對不對?我想知道......它是有意的行爲還是我誤解了某些東西?其實我期望至少onNextonComplete回調將在調用者的線程上被調用,而不是在發送事件的那個上。我是否正確理解在這種特殊情況下實際的調用者線程無關緊要?至少在異步生成事件時。

另一個值得關注的 - 如果我收到了一些可觀察到從一些外部源的參數(即我不產生對我自己)......有沒有辦法對我來說它的用戶檢查其是否是同步或異步,我只需明確指定我想通過subscribeOnobserveOn方法接收回調的位置,對吧?

謝謝!

回答

4

RxJava是unopinionated關於併發。如果您不使用observeOn/subscribeOn等任何其他mechanisem,它將在訂閱線程上產生值。請不要在運營商中使用Thread等低級構造,否則可能會破壞合同。

由於使用線程時,onNext將從調用線程調用(「後臺線程1」)。訂閱發生在調用(UI-Thread)上。鏈中的每個運算符都將從'background-thread-1'調用線程調用。訂閱onNext也將從'background-thread-1'被調用。

如果要產生不上調用線程使用的值:subscribeOn。如果你想把線程切換回主要用途observeOn鏈中的某個地方。最有可能在訂閱它之前。

實施例:

Observable.just(1,2,3) // creation of observable happens on Computational-Threads 
      .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Last will overwrite 
      .map(integer -> integer) // map happens on Computational-Threads 
      .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread 
      .subscribe(integer -> { 
       // called from mainThread 
      }); 

這裏是一個很好explanitation。 http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html