1
我正在尋找一種方法來將多個事件流與merge()運算符組合在一起,並且具有有限數量的「活動流」。例如,當我合併6個流(它們都在不同的線程上工作)時,merge()將訂閱所有這些流,但我需要它首先訂閱3個流,當其中一個流完成時另一個應該訂閱直到所有6個流都完成。是否有可能通過合併或任何其他RxJava運算符實現此目的,還是應該由我自己編寫?如何限制RxJava merge()中活動流的數量?
我正在尋找一種方法來將多個事件流與merge()運算符組合在一起,並且具有有限數量的「活動流」。例如,當我合併6個流(它們都在不同的線程上工作)時,merge()將訂閱所有這些流,但我需要它首先訂閱3個流,當其中一個流完成時另一個應該訂閱直到所有6個流都完成。是否有可能通過合併或任何其他RxJava運算符實現此目的,還是應該由我自己編寫?如何限制RxJava merge()中活動流的數量?
您可以在合併操作符中使用maxConcurrency的值,但是您必須將所有Observable包裝在一起。
@Test
public void testMergeMaxConcurrency() {
Observable.merge(Observable.just(
Observable.just(3),
Observable.just(5),
Observable.just(1),
Observable.just(4),
Observable.just(2)), 2)
.collect(ArrayList<Integer>::new, ArrayList::add)
.doOnNext(Collections::sort)
.subscribe(System.out::println);
}
隨便挑一個,有一個名爲'maxConcurrency'參數:http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#merge(java.lang.Iterable, %20int) – akarnokd