2016-10-20 48 views
1

我一直在使用Observable.fromEmitter()作爲Observable.create()的神奇替代品。我最近遇到了一些奇怪的行爲,我無法弄清楚爲什麼會出現這種情況。我真的很感謝有背壓和調度知識的人來看看這個。RxJava Observable.fromEmitter奇怪的背壓行爲

public final class EmitterTest { 
    public static void main(String[] args) { 
    Observable<Integer> obs = Observable.fromEmitter(emitter -> { 
     for (int i = 1; i < 1000; i++) { 
     if (i % 5 == 0) { 
      sleep(300L); 
     } 

     emitter.onNext(i); 
     } 

     emitter.onCompleted(); 
    }, Emitter.BackpressureMode.LATEST); 

    obs.subscribeOn(Schedulers.computation()) 
     .observeOn(Schedulers.computation()) 
     .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128" 

    sleep(10000L); 
    } 

    private static void sleep(Long duration) { 
    try { 
     Thread.sleep(duration); 
    } catch (InterruptedException e) { 
     throw new RuntimeException(e); 
    } 
    } 
} 

本申請的輸出是

Received 1 
Received 2 
... 
Received 128 

然後,它仍然停留在128(assumedly因爲這是RxJava的默認緩衝器大小)。

如果我將fromEmitter()中指定的模式更改爲BackpressureMode.NONE,則代碼將按預期工作。如果我刪除對observeOn()的呼叫,它也可以按預期工作。有人能夠解釋爲什麼會出現這種情況嗎?

+0

這是奇怪的,它不應該在所有的停止。即使在observeOn中使用toBlocking()或更小的緩衝區大小也會停止。我會進一步調查。 – akarnokd

+0

什麼是RX Java 2.0中Observable.fromEmitter的等價物? – Mike6679

+1

'Observable.create()'在2.0中替換'fromEmitter()'。如果你想使用'Observable.unsafeCreate()'這個舊的,可怕的創建行爲。 –

回答

2

這是一個相同的池死鎖情況。 subscribeOn在它正在使用的同一個線程上安排下游request,但是如果該線程忙於睡眠/發射循環,則請求永遠不會傳送到fromEmitter,因此在某段時間LATEST開始將元素放下直到非常結束如果主要來源等待足夠長時間,則會傳遞最後一個值(999)。 (這與我們刪除的onBackpressureBlock類似。)

如果subscribeOn沒有執行此請求調度,那麼此示例將工作順利。

我打開an issue來找出解決方案。

解決辦法,現在是使用更大的緩衝區大小與observeOn(有一個過載),或使用fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

+0

感謝您的簡潔回答,David!欣賞它。 –

1

這並不奇怪,這是預期的。

讓我們來追蹤調用。首先:

Observable.subscribe(Subscriber<? super T> subscriber)

Observable.subscribe(Subscriber<? super T> subscriber, Observable<T> observable)

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);

等。調查的構造函數:

OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize)

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { 
    this.scheduler = scheduler; 
    this.delayError = delayError; 
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; 
} 

如果不指定緩衝區,默認是RxRingBuffer.SIZE,其大小與平臺有關。

這就是爲什麼當您調用observeOn運算符而沒有緩衝區大小時,默認值爲128(Android上爲16)。

此問題的解決方案非常簡單:只需使用另一個observeOn運算符並聲明緩衝區大小。但是,如果您聲明緩衝區大小爲1000(與來自發射器的元素數量相同),則程序仍然會結束而不會釋放所有值(大約爲170)。爲什麼?因爲程序結束。主線程在10 000秒後結束,並且您在另一個線程(Schedulers.computation())中完成計算。解決方案呢?使用CountdownLatch。請注意不要使用它的製作,只是爲了測試而保留。

+0

感謝您的回答!我最終更喜歡David的回答,但是我很欣賞你爲解析這些電話而付出的努力。 –