2016-11-02 34 views
1

我想了解rxjava如何合併工作。因此,這裏是簡單的代碼應該合併來自2個觀測結果和發射到訂戶RxJava合併用戶只能從第一個可觀察到的結果獲得結果

Observable.merge(getObservable(), getTimedObservable()) 
       .subscribeOn(Schedulers.io()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Action1<String>() { 
        @Override public void call(final String s) { 
         Log.i("test", s); 
        } 
       }); 

    private Observable<String> getTimedObservable() { 
     return Observable.interval(150, TimeUnit.MILLISECONDS) 
       .map(new Func1<Long, String>() { 
        @Override public String call(final Long aLong) { 
         Log.i("test", "tick thread: " + Thread.currentThread().getId()); 
         return String.valueOf(aLong); 
        } 
       }); 
    } 

    public Observable<String> getObservable() { 
     return Observable.create(new Observable.OnSubscribe<String>() { 
      @Override public void call(final Subscriber<? super String> subscriber) { 
       try { 
        Log.i("test", "simple observable thread: " + Thread.currentThread().getId()); 
        for (int i = 1; i <= 10; i++) { 
         subscriber.onNext(String.valueOf(i * 100)); 
         Thread.sleep(300); 
        } 
        subscriber.onCompleted(); 
       } catch (Exception e) { 
        subscriber.onError(e); 
       } 
      } 
     }); 
    } 

我預計,合併的結果在訂戶會像

或類似的東西,但是,實際結果是:

test: simple observable thread: 257 
test: 100 
test: 200 
test: 300 
test: 400 
test: 500 
test: 600 
test: 700 
test: 800 
test: 900 
test: 1000 
test: tick thread: 254 
test: 0 
test: tick thread: 254 
test: 1 
test: tick thread: 254 
test: 2 
test: tick thread: 254 
test: 3 
test: tick thread: 254 
test: 4 
test: tick thread: 254 
test: 5 
test: tick thread: 254 
test: 6 
test: tick thread: 254 
test: 7 
test: tick thread: 254 
test: 8 
test: tick thread: 254 
test: 9 
test: tick thread: 254 
test: 10 
test: tick thread: 254 
test: 11 
test: tick thread: 254 
test: 12 
test: tick thread: 254 
test: 13 

它看起來像Thread.sleep在第一個Observable塊發射第二個可觀察,但我不明白如何。有人可以解釋嗎?

回答

3

合併將同時訂閱兩個觀察值。將首先訂閱的observable將在調用線程上產生值。由於調用線程被observable1阻塞,observable2不能生成值。 SubscribeOn只會說訂閱將在哪裏發生。讓我們說可觀察的開始在main-1上產生值。下游的每個價值將在同一個線程上。沒有併發發生。

如果你想達到併發性,你必須說每個可觀察的,訂閱必須發生的地方。所以我們可以說我們有兩個可觀察的Observables.merge。 Observable1和Observable2已經與一些Threadpool進行了訂閱。每個observable將在給定的subscribeOn線程上生成值。你實現了併發。

Plase看看編輯的產量爲:

@Test 
public void name() throws Exception { 
    Subscription subscribe = Observable.merge(getObservable(), getTimedObservable()) 
      //.observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(s -> { 

       System.out.println("subscription " + s); 
       //Log.i("test", s); 
      }); 


    Thread.sleep(5_000); 
} 

private Observable<String> getTimedObservable() { 
    return Observable.interval(150, TimeUnit.MILLISECONDS) 
      .map(aLong -> { 
       System.out.println("getTimedObservable: " + Thread.currentThread().getId()); 

       //Log.i("test", "tick thread: " + Thread.currentThread().getId()); 
       return String.valueOf(aLong); 
      }).subscribeOn(Schedulers.io()); 
} 

private Observable<String> getObservable() { 
    return Observable.<String>create(subscriber -> { 
     try { 
      for (int i = 1; i <= 10; i++) { 
       System.out.println("getObservable: " + Thread.currentThread().getId()); 
       subscriber.onNext(String.valueOf(i * 100)); 
       Thread.sleep(300); 
      } 
      subscriber.onCompleted(); 
     } catch (Exception e) { 
      subscriber.onError(e); 
     } 
    }).subscribeOn(Schedulers.io()); 
} 
+0

所以你想說,無論我已經寫了'.subscribeOn(Schedulers.io())',合併訂閱對觀測同一個線程?爲什麼線程ID不同於可觀察對象? – orium

+0

不同的線程,因爲間隔的默認subscribeOn-scheduler是來自RxJava的計算線程池。它有一個默認的調度程序,所以現在會產生死鎖,如果價值生產會在與調用相同的線程上發生。 getObservable具有noe默認調度程序,所以它將在subscribeOn中提供的io調度程序上訂閱。由於它已訂閱,它將開始在io-threadpool上生成值。調用線程被阻塞,直到Observable.create停止生成。如果您使用getTimedObservable切換getObservable,您將獲得併發性 –

0

RxJava默認情況下不是多線程的,並且所有內容都在同一個線程上運行。如果你想多線程,你需要使用一個調度器。

getObservable()的末尾添加.subscribe(Subscribers.io())

相關問題