只需使用flatmap()
並將其轉換爲Completable
即可。
這裏是(模擬)中執行它返回上io
池2項則在computation
池執行關於這些項目的計算網絡請求的示例,一切都平行:
@Test
public void foo() throws Exception {
Observable.range(1, 10)
.flatMap(this::getNItemsFromNetwork)
.flatMap(this::asyncCompuatation)
.ignoreElements()
.subscribe(() -> System.out.println("onComplete"),
(t) -> System.out.println("onError"));
Thread.sleep(10000);
}
Observable<String> getNItemsFromNetwork(int count) {
return Observable.just(count)
.subscribeOn(Schedulers.io())
.doOnNext(i -> System.out.println("Executing request for " + count + " on thread: " + Thread.currentThread()))
.flatMap(number -> Observable.just("Item nr " + number + ".1", "Item nr " + number + ".2"))
.delay(random.nextInt(1000), TimeUnit.MILLISECONDS);
}
Observable<String> asyncCompuatation(String string) {
return Observable.just(string)
.subscribeOn(Schedulers.computation())
.delay(random.nextInt(1000), TimeUnit.MILLISECONDS)
.doOnNext(number -> System.out.println("Computing " + number + " on thread: " + Thread.currentThread()));
}
和輸出進行驗證:
Executing request for 7 on thread: Thread[RxCachedThreadScheduler-7,5,main] Executing request for 6 on thread: Thread[RxCachedThreadScheduler-6,5,main] Executing request for 5 on thread: Thread[RxCachedThreadScheduler-5,5,main] Executing request for 1 on thread: Thread[RxCachedThreadScheduler-1,5,main] Executing request for 4 on thread: Thread[RxCachedThreadScheduler-4,5,main] Executing request for 3 on thread: Thread[RxCachedThreadScheduler-3,5,main] Executing request for 8 on thread: Thread[RxCachedThreadScheduler-8,5,main] Executing request for 2 on thread: Thread[RxCachedThreadScheduler-2,5,main] Executing request for 9 on thread: Thread[RxCachedThreadScheduler-9,5,main] Executing request for 10 on thread: Thread[RxCachedThreadScheduler-10,5,main] Computing Item nr 7.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 10.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 6.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 3.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 4.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 3.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 6.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 2.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 5.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 5.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 7.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 2.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 10.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 9.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 4.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 9.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 8.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 8.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 1.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 1.2 on thread: Thread[RxComputationThreadPool-1,5,main] onComplete
它是一個完整的代碼?因爲在推遲和合並單個主題方面我沒有看到任何意見。 請同時發佈'startRequests()'代碼。 – Dimezis
@Dimezis這是一個使用'requestSubject.onNext(client.createRequest(parameter));'創建併發布(第一輪並行)網絡請求的循環。我認爲你是對的,因爲創建這些請求不會執行它們,所以我可以在調用線程上創建它們並返回在訂閱時啓動它們的Completable。我發佈的示例無論如何都不會與PublishSubject一起工作,我認爲它需要一個ReplaySubject。 – ferbeb