我想了解rxjava如何合併工作。因此,這裏是簡單的代碼應該合併來自2個觀測結果和發射到訂戶RxJava合併用戶只能從第一個可觀察到的結果獲得結果
Observable.merge(getObservable(), getTimedObservable())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override public void call(final String s) {
Log.i("test", s);
}
});
private Observable<String> getTimedObservable() {
return Observable.interval(150, TimeUnit.MILLISECONDS)
.map(new Func1<Long, String>() {
@Override public String call(final Long aLong) {
Log.i("test", "tick thread: " + Thread.currentThread().getId());
return String.valueOf(aLong);
}
});
}
public Observable<String> getObservable() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(final Subscriber<? super String> subscriber) {
try {
Log.i("test", "simple observable thread: " + Thread.currentThread().getId());
for (int i = 1; i <= 10; i++) {
subscriber.onNext(String.valueOf(i * 100));
Thread.sleep(300);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}
我預計,合併的結果在訂戶會像
或類似的東西,但是,實際結果是:
test: simple observable thread: 257
test: 100
test: 200
test: 300
test: 400
test: 500
test: 600
test: 700
test: 800
test: 900
test: 1000
test: tick thread: 254
test: 0
test: tick thread: 254
test: 1
test: tick thread: 254
test: 2
test: tick thread: 254
test: 3
test: tick thread: 254
test: 4
test: tick thread: 254
test: 5
test: tick thread: 254
test: 6
test: tick thread: 254
test: 7
test: tick thread: 254
test: 8
test: tick thread: 254
test: 9
test: tick thread: 254
test: 10
test: tick thread: 254
test: 11
test: tick thread: 254
test: 12
test: tick thread: 254
test: 13
它看起來像Thread.sleep在第一個Observable塊發射第二個可觀察,但我不明白如何。有人可以解釋嗎?
所以你想說,無論我已經寫了'.subscribeOn(Schedulers.io())',合併訂閱對觀測同一個線程?爲什麼線程ID不同於可觀察對象? – orium
不同的線程,因爲間隔的默認subscribeOn-scheduler是來自RxJava的計算線程池。它有一個默認的調度程序,所以現在會產生死鎖,如果價值生產會在與調用相同的線程上發生。 getObservable具有noe默認調度程序,所以它將在subscribeOn中提供的io調度程序上訂閱。由於它已訂閱,它將開始在io-threadpool上生成值。調用線程被阻塞,直到Observable.create停止生成。如果您使用getTimedObservable切換getObservable,您將獲得併發性 –