2017-06-29 109 views
0

我有一個Observable<Observable<String>> c。我想用新行將每個內部Observable合併爲一個String。結果應該是Observable<String>如何組合最近的一個Observable <Observable <T>>?

有一個similar discussion for RxJS on GitHub

下面是使用zip一個實現:

final Observable<String> d = Observable.zip(c, objects -> Arrays.stream(objects) 
    .map(x -> (String)x) 
    .collect(Collectors.joining("\n"))); 

的問題是,zip對元素,因此如果內Observable對象具有不同的長度,那麼不一切都被示出。我想用combineLatest代替,但似乎沒有成爲一個匹配功能:

public static <T, R> Observable<R> combineLatest(
    ObservableSource<? extends ObservableSource<? extends T>> sources, 
    Function<? super Object[], ? extends R> zipper) 

我應該如何實現呢?


舉例來說,如果我有三個觀測:

// a 
Observable.just("Started", "0%", "5%", "10%", "20%", "40%", "95%", "100%", "Finished") 

// b 
Observable.just("Started", "0%", "10%", "30%", "40%", "100%", "Finished") 

// c 
Observable.just("Started", "0%", "20%", "25%", "40%", "70%", "90%", "100%", "Finished") 

然後我zip解決方案的輸出是:

--------------------- 
a: Started 
b: Started 
c: Started 
--------------------- 
a: 0% 
b: 0% 
c: 0% 
--------------------- 
a: 5% 
b: 10% 
c: 20% 
--------------------- 
a: 10% 
b: 30% 
c: 25% 
--------------------- 
a: 20% 
b: 40% 
c: 40% 
--------------------- 
a: 40% 
b: 100% 
c: 70% 
--------------------- 
a: 95% 
b: Finished 
c: 90% 

但所需的輸出是一樣的東西:

--------------------- 
a: Started 
b: Started 
c: Started 
--------------------- 
a: 0% 
b: 0% 
c: 0% 
--------------------- 
a: 5% 
b: 10% 
c: 20% 
--------------------- 
a: 10% 
b: 30% 
c: 25% 
--------------------- 
a: 20% 
b: 40% 
c: 40% 
--------------------- 
a: 40% 
b: 100% 
c: 70% 
--------------------- 
a: 95% 
b: Finished 
c: 90% 
--------------------- 
a: 100% 
b: Finished 
c: 100% 
--------------------- 
a: Finished 
b: Finished 
c: Finished 

回答

0

使用RxJava2(和Java 8)

c.toList().toObservable() 
      .flatMap(t -> 
        Observable.combineLatest(t, 
              objects -> Arrays.stream(objects) 
                  .map(x ->(String)x).collect(Collectors.joining("\n")))) 

使用RxJava

c.toList().flatMap(t -> 
        Observable.combineLatest(t, 
              objects -> Arrays.stream(objects) 
                  .map(x -> (String)x).collect(Collectors.joining("\n")))); 

我懷疑它會做你想要什麼,但因爲它是完全有可能將「跳過」排放量,但就是這樣combineLatest作品。

+0

這似乎加入了錯誤的方式。在我的'zip'版本中,每個內部observables在每個結果元素中都有一行。在這個版本中,每個結果元素都有一個內部可觀察對象的所有行。 – sdgfsdh

+0

您可以添加一個輸入示例和您想要輸出的內容嗎? – JohnWowUs

+0

我更新了問題@JohnWowUs – sdgfsdh

相關問題