2017-07-30 38 views
1

我有多個通知來源:A,B,C(它們都產生不同類型的對象,但有一些共同的屬性),我想合併成一個新條件滿足特定條件時。流的RxJava:基於一定條件合併多個對象流

實施例:

A: [ ObjectA(id=1), ObjectA(id=2), ObjectA(id=3), ObjectA(id=5), ObjectA(id=4)] 
B: [ ObjectB(id=1), ObjectB(id=2), ObjectB(id=3), ObjectB(id=4), ObjectB(id=5)] 
C: [ ObjectC(id=1), ObjectC(id=2), ObjectC(id=3), ObjectC(id=4), ObjectC(id=5)] 

所需的輸出:

Result: [ ObjectABC(id=1), ObjectABC(id=2), ObjectABC(id=3), ObjectA(id=4), ObjectA(id=5)] 

每個ObjectABC將被創建並且當ObjectAObjectBObjectC具有相同的ID被接收添加到結果流。

如果沒有找到匹配,它應該等到找到相同的三元組。

在當前收到的三元組條目不匹配的例子中(類似於第四次迭代A的第四條條目有一個Id = 5),它應該被保留,直到來自其他流的匹配項被處理或丟棄後一段的時間。

這是可以通過RxJava實現的。

+1

如果有比賽,你想結合所有三個。但是,如果ids在某個時間點不匹配,那麼這三個項目會發生什麼? –

+0

@Arnav我已經添加了關於這個的詳細信息 –

+0

因此,對於每一組值,如果所有的id都在結果流上傳遞相同的結果並丟棄其餘的結果?那麼,在期望的輸出中,你不能接受第四和第五個值是正確的嗎? – masp

回答

1

我沒有一個IDE出手,但是這應該解釋邏輯:

// First, extract all of the IDs as they arrive 
final Observable<Integer> ids = as.map(i -> i.id).distinct(); 

// Then, for each ID, extract the ObjectA, ObjectB and ObjectC instances 
// and zip them together. 
final Observable<ObjectABC> abcs = ids.flatMap(id -> Observable.zip(
    as.filter(a -> a.id.equals(id)).take(1), 
    bs.filter(b -> b.id.equals(id)).take(1), 
    cs.filter(c -> c.id.equals(id)).take(1), 
    (a, b, c) -> new ObjectABC(a, b, c)); 

如果要強制排序:

abcs.sort(Comparator.comparing(abc -> abc.id)); 
相關問題