2017-09-24 46 views
0

我正在加載一些項目以在單個網頁上顯示。與此同時,我正在加載物品的價格。簡化它看起來像這樣:RxJava。加入來自兩個Observable/Flowable流的所有項目

Observable<Integer> ids = itemIdsToShow(); // COLD Observable 
Observable<Item> items = ids.flatMap(id -> loadingItem(id)); 
Observable<Price> prices = ids.flatMap(id -> loadingPrice(id)); 

更新:「物品」和「價格」都沒有排序。

現在我想將它們結合在一起。

Observable<Long> wait = Observable.interval(1000, TimeUnit.SECONDS); 
Observable<Pair<Item, Price>> pairs = items.join(prices, (ii)->wait, (pp)->wait, Pair::of); 

Observable<Item2> items2 = pairs.filter(p->p.a.id == p.b.id).map(p->new Item2(p.a, p.b)); 

它有效,但Observable「wait」看起來很奇怪。或者,我可以使用任何從未完成的Observable。其實我需要一個可觀察的,完成「項目」和「價格」的完成。

下一個建議不起作用。

Observable<Object> wait = items.mergeWith(prices).takeLast(1); 

法「加入」訂閱這個新創建的再觀察的,從而啓動了新一代的ID序列的(一切從頭開始)。

我很確定,有一個乾淨而好看的方式來完成全連接而不創建自定義實現。

回答

0

Zip operator完全是這樣。爲了您的例子中,你想最終是這樣的:

Observable.zip(Item, Price, Pair<Item, Price>)(items, prices, 
BiFunction { item, price -> 
    Pair(item, price) 
}) 

你可以看到對此進行了詳細解釋here一個例子。

編輯:根據更新的問題,他們沒有訂購,因爲flatMap不保留秩序。取而代之的是結帳concatMap哪(direct link to the marble diagram)。

+0

只有在兩個可觀察對象以相同順序生成數據時,它纔有效。它不是這種情況: - ( – 30thh

+0

結帳concatMap而不是flatMap –

+0

取回部分在問題中非常簡化,原來它包含一個包含多路複用和解複用的長鏈觀測值,如果我將所有「flatMap」調用替換爲「 concatMap「,它會使代碼變得不可讀,並且會失去併發執行的好處。||||||這個問題不是關於讀取,而是關於連接。 – 30thh