2017-08-17 32 views
1

我正在尋找一種方法來將多個事件流與merge()運算符組合在一起,並且具有有限數量的「活動流」。例如,當我合併6個流(它們都在不同的線程上工作)時,merge()將訂閱所有這些流,但我需要它首先訂閱3個流,當其中一個流完成時另一個應該訂閱直到所有6個流都完成。是否有可能通過合併或任何其他RxJava運算符實現此目的,還是應該由我自己編寫?如何限制RxJava merge()中活動流的數量?

+1

隨便挑一個,有一個名爲'maxConcurrency'參數:http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#merge(java.lang.Iterable, %20int) – akarnokd

回答

1

您可以在合併操作符中使用maxConcurrency的值,但是您必須將所有Observable包裝在一起。

@Test 
    public void testMergeMaxConcurrency() { 
     Observable.merge(Observable.just(
       Observable.just(3), 
       Observable.just(5), 
       Observable.just(1), 
       Observable.just(4), 
       Observable.just(2)), 2) 
       .collect(ArrayList<Integer>::new, ArrayList::add) 
       .doOnNext(Collections::sort) 
       .subscribe(System.out::println); 
    }