2016-06-08 85 views
4

我有一個同時上傳多個文件到雲存儲的方法。它看起來是這樣的:RxJava - 上傳順序文件 - 發出下一個項目,當onNext稱爲

List<String> files = Arrays.asList("file0", "file1", "file2"); 

Observable.from(files) 
     .flatMap(file -> uploadFile(file) 
       .flatMap(done -> notifyFinished(file))) 
     .subscribe(this::onNext, this::onError, this::onCompleted); 


private Observable<Boolean> uploadFile(String file) { 
    Timber.d("Uploading: " + file); 
    return Observable.just(true).delay(6, TimeUnit.SECONDS); 
} 

private Observable<Boolean> notifyFinished(String file) { 
    Timber.d("Notify finished: " + file); 
    return Observable.just(true).delay(3, TimeUnit.SECONDS); 
} 

的這個輸出是:

06-09 02:10:04.779 D: Uploading: file0 
06-09 02:10:04.780 D: Uploading: file1 
06-09 02:10:04.781 D: Uploading: file2 
06-09 02:10:10.782 D: Notify finished: file1 
06-09 02:10:10.782 D: Notify finished: file0 
06-09 02:10:10.783 D: Notify finished: file2 
06-09 02:10:13.784 D: onNext 
06-09 02:10:13.786 D: onNext 
06-09 02:10:13.786 D: onNext 
06-09 02:10:13.787 D: onCompleted 

我想讓它按順序工作,比如:

1) Uploading: file0 
2) Notify finished: file0 
3) onNext 
4) Uploading: file1 
5) Notify finished: file1 
6) onNext 
    ... 

是否有可能做這樣的事情這與Rx?

編輯

更換第一flatMapconcatMap做的工作。我想,我知道這些運營商之間的差異,但這個例子只是表明,我什麼都不知道......現在的輸出是:

06-09 02:15:00.581 D: Uploading: file0 
06-09 02:15:06.584 D: Notify finished: file0 
06-09 02:15:09.586 D: onNext 
06-09 02:15:09.587 D: Uploading: file1 
06-09 02:15:15.590 D: Notify finished: file1 
06-09 02:15:18.593 D: onNext 
06-09 02:15:18.595 D: Uploading: file2 
06-09 02:15:24.598 D: Notify finished: file2 
06-09 02:15:27.599 D: onNext 
06-09 02:15:27.601 D: onCompleted 
+0

結合觀測什麼是你想要做的更多的例子嗎?這看起來已經可以順序工作了。你看到什麼順序? – Whymarrh

+0

示例很糟糕,請檢查我的編輯。 – rafakob

回答

5

,如果你想「訂購」連續順序,只需使用concatMap(),而不是flatMap()

1

創建每個文件可觀察到的和Concat的三個觀測

@Test 
public void testContact() { 

    Observable.concat(Observable.just(uploadFile(file1)), 
         Observable.just(uploadFile(file2)), 
         Observable.just(uploadFile(file3))) 
       .flatMap(file -> notifyFinished(file))) 
       .subscribe(this::onNext, this::onError, this::onCompleted); 
} 

你將不得不作出的方法notifyFinished返回觀察到的文件,而不是布爾值。

您也可以使用合併或zip,你在這裏https://github.com/politrons/reactive/tree/master/src/test/java/rx/observables/combining

相關問題