我遇到了使用RxJava背壓的問題。基本上,我有一個生產者生產的產品比消費者可以處理的要多,並且希望有一些緩衝隊列來處理我可以處理的物品,並請求完成其中的一些物品,如本例中:RxJava/RxScala使用請求的背壓
object Tester extends App {
Observable[Int] { subscriber =>
(1 to 100).foreach { e =>
subscriber.onNext(e)
Thread.sleep(100)
println("produced " + e + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
}
}
.subscribeOn(NewThreadScheduler())
.observeOn(ComputationScheduler())
.subscribe(
new Subscriber[Int]() {
override def onStart(): Unit = {
request(2)
}
override def onNext(value: Int): Unit = {
Thread.sleep(1000)
println("consumed " + value + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
request(1)
}
override def onCompleted(): Unit = {
println("finished ")
}
})
Thread.sleep(100000)
我希望得到輸出如下
produced 1(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 2(RxNewThreadScheduler-113)
consumed 2(RxComputationThreadPool-312)
produced 3(RxNewThreadScheduler-113)
consumed 3(RxComputationThreadPool-312)
......
,而是,我得到
produced 1(RxNewThreadScheduler-113)
produced 2(RxNewThreadScheduler-113)
produced 3(RxNewThreadScheduler-113)
produced 4(RxNewThreadScheduler-113)
produced 5(RxNewThreadScheduler-113)
produced 6(RxNewThreadScheduler-113)
produced 7(RxNewThreadScheduler-113)
produced 8(RxNewThreadScheduler-113)
produced 9(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 10(RxNewThreadScheduler-113)
produced 11(RxNewThreadScheduler-113)
produced 12(RxNewThreadScheduler-113)
produced 13(RxNewThreadScheduler-113)
.....
添加'onError'處理程序到你的用戶,讓您獲取有關故障的信息。 –