RxJava中的背壓並不是真正的反壓,而只是忽略了一些元素。RxJava中REAL背壓的最佳實施
但是如果我不能釋放任何元素而且我需要以某種方式減慢情緒?
RxJava不會影響元素意志,所以開發者需要自己實現它。但是如何?
想到最簡單的方法就是使用一些計數器,在完成時遞增和遞減。
就像是:
public static void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
Scheduler sA = Schedulers.from(Executors.newFixedThreadPool(1));
Scheduler sB = Schedulers.from(Executors.newFixedThreadPool(5));
Observable.create(s -> {
while (!s.isUnsubscribed()) {
if (counter.get() < 100) {
s.onNext(Math.random());
counter.incrementAndGet();
} else {
sleep(100);
}
}
}).subscribeOn(sA)
.flatMap(r ->
Observable.just(r)
.subscribeOn(sB)
.doOnNext(x -> sleep(1000))
.doOnNext(x -> counter.decrementAndGet())
)
.subscribe();
}
但我覺得這樣很可憐。有更好的解決方案嗎?
隊列沒有被反壓。背壓點是向生產者發出信號以減緩排放。這可能不總是可能的,但在絕大多數情況下,這是可能的。這就是爲什麼在JDK9的新反應流規範中顯式反壓。 https://github.com/reactive-streams/reactive-streams-jvm – kaqqao