我一直在使用Observable.fromEmitter()
作爲Observable.create()
的神奇替代品。我最近遇到了一些奇怪的行爲,我無法弄清楚爲什麼會出現這種情況。我真的很感謝有背壓和調度知識的人來看看這個。RxJava Observable.fromEmitter奇怪的背壓行爲
public final class EmitterTest {
public static void main(String[] args) {
Observable<Integer> obs = Observable.fromEmitter(emitter -> {
for (int i = 1; i < 1000; i++) {
if (i % 5 == 0) {
sleep(300L);
}
emitter.onNext(i);
}
emitter.onCompleted();
}, Emitter.BackpressureMode.LATEST);
obs.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"
sleep(10000L);
}
private static void sleep(Long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
本申請的輸出是
Received 1
Received 2
...
Received 128
然後,它仍然停留在128(assumedly因爲這是RxJava的默認緩衝器大小)。
如果我將fromEmitter()
中指定的模式更改爲BackpressureMode.NONE
,則代碼將按預期工作。如果我刪除對observeOn()
的呼叫,它也可以按預期工作。有人能夠解釋爲什麼會出現這種情況嗎?
這是奇怪的,它不應該在所有的停止。即使在observeOn中使用toBlocking()或更小的緩衝區大小也會停止。我會進一步調查。 – akarnokd
什麼是RX Java 2.0中Observable.fromEmitter的等價物? – Mike6679
'Observable.create()'在2.0中替換'fromEmitter()'。如果你想使用'Observable.unsafeCreate()'這個舊的,可怕的創建行爲。 –