2016-01-15 51 views
1

我遇到了使用RxJava背壓的問題。基本上,我有一個生產者生產的產品比消費者可以處理的要多,並且希望有一些緩衝隊列來處理我可以處理的物品,並請求完成其中的一些物品,如本例中:RxJava/RxScala使用請求的背壓

object Tester extends App { 

Observable[Int] { subscriber => 
    (1 to 100).foreach { e => 
    subscriber.onNext(e) 
    Thread.sleep(100) 
    println("produced " + e + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")") 
    } 
} 
.subscribeOn(NewThreadScheduler()) 
.observeOn(ComputationScheduler()) 
.subscribe(
    new Subscriber[Int]() { 
    override def onStart(): Unit = { 
     request(2) 
    } 

    override def onNext(value: Int): Unit = { 
     Thread.sleep(1000) 
     println("consumed " + value + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")") 
     request(1) 
    } 

    override def onCompleted(): Unit = { 
     println("finished ") 
    } 
}) 

Thread.sleep(100000) 

我希望得到輸出如下

produced 1(RxNewThreadScheduler-113) 
consumed 1(RxComputationThreadPool-312) 
produced 2(RxNewThreadScheduler-113) 
consumed 2(RxComputationThreadPool-312) 
produced 3(RxNewThreadScheduler-113) 
consumed 3(RxComputationThreadPool-312) 
...... 

,而是,我得到

produced 1(RxNewThreadScheduler-113) 
produced 2(RxNewThreadScheduler-113) 
produced 3(RxNewThreadScheduler-113) 
produced 4(RxNewThreadScheduler-113) 
produced 5(RxNewThreadScheduler-113) 
produced 6(RxNewThreadScheduler-113) 
produced 7(RxNewThreadScheduler-113) 
produced 8(RxNewThreadScheduler-113) 
produced 9(RxNewThreadScheduler-113) 
consumed 1(RxComputationThreadPool-312) 
produced 10(RxNewThreadScheduler-113) 
produced 11(RxNewThreadScheduler-113) 
produced 12(RxNewThreadScheduler-113) 
produced 13(RxNewThreadScheduler-113) 
..... 
+0

添加'onError'處理程序到你的用戶,讓您獲取有關故障的信息。 –

回答

1

當你實現你的Observable使用Observable.create管理背壓取決於你(這不是一項簡單的任務)。在這裏,你的observable簡單地忽略了被動的pull請求(你只是迭代,而不是等待一個調用迭代器的方法next()的請求)。

如果可能的話,儘量使用Observable工廠方法等range,等...和構成使用map/flatMap以獲得期望的源觀察的,如那些將尊重背壓。

否則,請查看最近推出的用於在OnSubscribe實施中正確管理背壓的實驗工具類:AsyncOnSubscribeSyncOnSubscribe

這是一個非常簡單的例子:

Observable<Integer> backpressuredObservable = 
    Observable.create(SyncOnSubscribe.createStateful(
    () -> 0, //starts the state at 0 
    (state, obs) -> { 
     int i = state++; //first i is 1 as desired 
     obs.next(i); 
     if (i == 100) { //maximum is 100, stop there 
      obs.onCompleted(); 
     } 
     return i; //update the state 
})); 
+0

鏈接被破壞 – AndroidEx

+0

我的壞:(似乎最新的javadocs沒有在網上發佈,所以我鏈接到源代碼... –

+0

也我把代碼的lambda版本,但編譯器有點麻煩接受它,所以你可能會更好的使用匿名類的等價物來代替......除非你使用Scala :) –