2017-08-14 22 views
1

我有兩個不同類型的未分類observables。這兩種類型共享一個共同的密鑰。我想將它們加入到一個新的可觀察對象中,並且我無法弄清楚如何去做。如何通過鍵加入兩個RxJava2 Obvervables?

請注意,某些鍵可能會丟失。如果沒有完成配對,那麼沒關係,但是用null代替缺失的配對會更好。

輸入1:

Entity(id = 2), 
Entity(id = 1), 
Entity(id = 4) 

輸入2:

Dto(id = 3), 
Dto(id = 2), 
Dto(id = 1) 

預期輸出(以任何次序):

Pair(Entity(id = 1), Dto(id = 1)), 
Pair(Entity(id = 2), Dto(id = 2)), 
Pair(null, Dto(id = 3)), 
Pair(Entity(id = 4), null) 
+1

您希望Observables等待相應元素多長時間? –

+0

無限期我想。來自DB和Dtos的實體來自Http請求,所以我應該在內存中擁有完整的數據集 –

+0

如果是這樣,爲什麼要進行流式處理,可能會更容易將它們轉換爲內存集合,然後進行轉換。 –

回答

1

首先,Observable.merge流一起:這給你一個所有物品的流。 (在下面的代碼,我使用的自定義類Either來標記每個流。)

然後,對於流中的每個項目,嘗試與其它類型的先前觀察到的項匹配它,並輸出這對。如果沒有,請將其保存爲稍後匹配。

最後,一旦流完成,其餘不匹配的元素將不會與任何內容匹配,因此它們可以不成對排放。

import io.reactivex.Observable 

data class Entity(val id: Int) 
data class Dto(val id: Int) 

sealed class Either<out A, out B> 
data class Left<A>(val value: A) : Either<A, Nothing>() 
data class Right<B>(val value: B) : Either<Nothing, B>() 

fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> { 
    val unmatchedA = mutableMapOf<C, A>() 
    val unmatchedB = mutableMapOf<C, B>() 
    val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest -> 
     when (latest) { 
      is Left -> { 
       val id = idA(latest.value) 
       unmatchedB.remove(id)?.let { [email protected] Observable.just(latest.value to it) } 
       unmatchedA.put(id, latest.value) 
      } 
      is Right -> { 
       val id = idB(latest.value) 
       unmatchedA.remove(id)?.let { [email protected] Observable.just(it to latest.value) } 
       unmatchedB.put(id, latest.value) 
      } 
     } 
     Observable.empty<Nothing>() 
    } 
    return Observable.concat(merged, Observable.create { emitter -> 
     unmatchedA.values.forEach { emitter.onNext(it to null) } 
     unmatchedB.values.forEach { emitter.onNext(null to it) } 
     emitter.onComplete() 
    }) 
} 

fun main(args: Array<String>) { 
    val entities = Observable.just(Entity(2), Entity(1), Entity(4)) 
    val dtos = Observable.just(Dto(3), Dto(2), Dto(1)) 
    joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println) 
} 
(Entity(id=2), Dto(id=2)) 
(Entity(id=1), Dto(id=1)) 
(Entity(id=4), null) 
(null, Dto(id=3)) 

注意,這可能有一些奇怪的行爲,如果ID流中的重複,並根據流的結構有可能,這將最終緩衝很多存儲元件。