2017-02-28 50 views
0

我需要創建併發的網絡請求。根據這些請求的結果,可能會開始更多的請求。將併發子任務的未知數合併爲一個Completable

我想獲得一個Completable,一旦所有的請求都完成並且不需要創建進一步的請求,Completable就完成了。

我的問題是,這是可能的用下面的代碼片斷來實現:

return Completable.defer(() -> { 
     startRequests(); 
     return Observable.merge(requestSubject.asObservable()).toCompletable(); 
    }); 

在這個例子中,startRequest將增加網絡請求(改造)到requestSubject,這是一個PublishSubject<Observable<SomeResponse>>

具體而言,我希望網絡請求在訂閱時在IO調度器上啓動,並且返回的Completable不會完成,直到我在其中一個請求的onNext中調用requestSubject.onComplete()

我還沒有弄清楚如何在不執行請求兩次的情況下處理請求的響應(對每個訂閱請求進行修改)。

這樣工作嗎,還是有更好的方法來實現我在找的東西?謝謝!

+0

它是一個完整的代碼?因爲在推遲和合並單個主題方面我沒有看到任何意見。 請同時發佈'startRequests()'代碼。 – Dimezis

+0

@Dimezis這是一個使用'requestSubject.onNext(client.createRequest(parameter));'創建併發布(第一輪並行)網絡請求的循環。我認爲你是對的,因爲創建這些請求不會執行它們,所以我可以在調用線程上創建它們並返回在訂閱時啓動它們的Completable。我發佈的示例無論如何都不會與PublishSubject一起工作,我認爲它需要一個ReplaySubject。 – ferbeb

回答

1

好吧,不知道我的問題是否100%正確,但這是我將做的一個粗略草圖...我相信你想有一個Subject作爲緩存的中間級別,而不是中斷實際請求時你會打電話退訂。

1)假設您有2個改進觀察值。

2)在startRequests()中,您需要訂閱他們兩個(您需要的某個調度程序),應用doOnNext運算符並將數據委託給您的subject。因此,該主題將收到2個來自API的數據。

3)訂閱您的主題,您將收到2個數據滴答。

基本上沒有必要等待完成,你只會收到N數量的onNext刻度。 但是,如果您想要指示所有請求都已完成,則可以合併所有改進的observables,並將所有事件委託給主題,因此最終將收到n個onNext ticks和onComplete。

0

我認爲使用Subjects是不必要的併發症。 您可以簡單地使用flatMap()並轉移到Completable在最後使用toCompletable(),你沒有提到你的具體循環是如何工作的,但假設你有一些List循環查詢就像這樣,其中startRequest(data)返回Retrofit查詢Observable

List<Data> list = ...; 
    Observable.from(list) 
      .flatMap(new Func1<Data, Observable<Result>>() { 
       @Override 
       public Observable<Result> call(Data data) { 
        return startRequest(data); 
       } 
      }).toCompletable(); 

關於你的第二個要求,做更多的請求要看結果,在這種情況下,你可能希望蒐集使用toList()所有請求,你會得到一個onNext()通知,那麼你就可以過濾一切,並獲得Observable,當你想要更多的數據時,它會發射物品:

List<Data> list =...; 
    Observable.from(list) 
      .flatMap(new Func1<Data, Observable<Result>>() { 
       @Override 
       public Observable<Result> call(Data data) { 
        return startRequest(data); 
       } 
      }) 
      .toList() 
      .filter(new Func1<List<Result>, Boolean>() { 
       @Override 
       public Boolean call(List<Result> results) { 
        return shouldRequestMore(results); 
       } 
      }); 
2

只需使用flatmap()並將其轉換爲Completable即可。

這裏是(模擬)中執行它返回上io池2項則在computation池執行關於這些項目的計算網絡請求的示例,一切都平行:

@Test 
public void foo() throws Exception { 
    Observable.range(1, 10) 
      .flatMap(this::getNItemsFromNetwork) 
      .flatMap(this::asyncCompuatation) 
      .ignoreElements() 
      .subscribe(() -> System.out.println("onComplete"), 
        (t) -> System.out.println("onError")); 

    Thread.sleep(10000); 
} 

Observable<String> getNItemsFromNetwork(int count) { 
    return Observable.just(count) 
      .subscribeOn(Schedulers.io()) 
      .doOnNext(i -> System.out.println("Executing request for " + count + " on thread: " + Thread.currentThread())) 
      .flatMap(number -> Observable.just("Item nr " + number + ".1", "Item nr " + number + ".2")) 
      .delay(random.nextInt(1000), TimeUnit.MILLISECONDS); 
} 

Observable<String> asyncCompuatation(String string) { 
    return Observable.just(string) 
      .subscribeOn(Schedulers.computation()) 
      .delay(random.nextInt(1000), TimeUnit.MILLISECONDS) 
      .doOnNext(number -> System.out.println("Computing " + number + " on thread: " + Thread.currentThread())); 
} 

和輸出進行驗證:

Executing request for 7 on thread: Thread[RxCachedThreadScheduler-7,5,main] Executing request for 6 on thread: Thread[RxCachedThreadScheduler-6,5,main] Executing request for 5 on thread: Thread[RxCachedThreadScheduler-5,5,main] Executing request for 1 on thread: Thread[RxCachedThreadScheduler-1,5,main] Executing request for 4 on thread: Thread[RxCachedThreadScheduler-4,5,main] Executing request for 3 on thread: Thread[RxCachedThreadScheduler-3,5,main] Executing request for 8 on thread: Thread[RxCachedThreadScheduler-8,5,main] Executing request for 2 on thread: Thread[RxCachedThreadScheduler-2,5,main] Executing request for 9 on thread: Thread[RxCachedThreadScheduler-9,5,main] Executing request for 10 on thread: Thread[RxCachedThreadScheduler-10,5,main] Computing Item nr 7.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 10.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 6.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 3.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 4.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 3.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 6.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 2.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 5.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 5.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 7.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 2.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 10.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 9.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 4.2 on thread: Thread[RxComputationThreadPool-1,5,main] Computing Item nr 9.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 8.1 on thread: Thread[RxComputationThreadPool-5,5,main] Computing Item nr 8.2 on thread: Thread[RxComputationThreadPool-2,5,main] Computing Item nr 1.1 on thread: Thread[RxComputationThreadPool-7,5,main] Computing Item nr 1.2 on thread: Thread[RxComputationThreadPool-1,5,main] onComplete