2016-12-02 55 views
1

我試圖使用窗口/緩衝區操作符,但我最終有同樣的問題。由於緩衝區和窗口返回一個可觀察到的每個意志,我最終有一個Observable<ArrayList<String>>在我的回報,但我需要一個List<Integer>平坦的窗口/緩衝區上的所有observables

任何想法如何平坦所有這些observables?

@Test 
public void test() { 
    Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; 
    Observable.from(numbers) 
      .window(4) 
      .flatMap(ns -> ns 
        .map(number -> "uniqueKey=" + number + "&") 
        .reduce("", String::concat)) 
      .map(query -> query.substring(0, query.length() - 1)) 
      .collect(ArrayList<String>::new, List::add); //-> Given an error since it´s an Observable<ArrayList<String>> 

} 

如果我訂閱了observable,並且我在外部列表中添加了所有元素,但它不是我們所希望的。

.map(query -> query.substring(0, query.length() - 1)) 
      .subscribe(elements::add); 

UPDATE:

既然不能刪除的問題,只是確認,顯然是爲了獲得該項目發出的唯一途徑是通過訂閱,或toBlocking。

我想我現在用了太多的Stream()手掌圖標!

問候。

回答

2

這是預期的行爲,因爲所有操作員都返回Observable。如果你需要阻塞,直到你的代碼完成,你可以使用.toBlocking().first()

+0

謝謝你的答案,但正如我在我的回答中所說,如果我必須等待,我會使用訂閱,據我所知使用.toBlocking()。首先()它是一種反模式,只能在測試場景中使用。 – paul

+0

如果你想接收一個'List',你需要'訂閱'到一個'Observable',並把它放到'onNext()'或者塊中。還有'forEach'運算符,但是你將需要一個外部'List',因爲它返回void。 –

相關問題