2017-11-17 209 views
1

一些大寫金額我想測試像這樣的情況後subsrcibe它:如何合併2個獨立的數據流,從他們緩衝填充的數據和時間

我有2類新剛剛從同一個延伸。

我創建,並從項目的每個班級列表觀測量:

val listSomeClass1 = ArrayList<SomeClass1>() 
val listSomeClass2 = ArrayList<SomeClass2>() 

fun populateJust1() { 
    listSomeClass1.add(SomeClass1("23", 23)) 
    listSomeClass1.add(SomeClass1("24", 24)) 
    listSomeClass1.add(SomeClass1("25", 25)) 
} 

fun populateJust2() { 
    listSomeClass2.add(SomeClass2(23.00)) 
    listSomeClass2.add(SomeClass2(24.00)) 
    listSomeClass2.add(SomeClass2(25.00)) 
} 

populateItemsSomeClass1() 
populateItemsSomeClass2() 

現在我可以創建2個觀測值:

val someClass1Observable = Observable.fromIterable(listSomeClass1) 
    val someClass2Observable = Observable.fromIterable(listSomeClass2) 

而在這裏,我想從合併排放他們,緩衝它,並在10秒後訂閱它:

 Observable.merge(someClass1Observable, someClass2Observable) 
      .buffer(10, TimeUnit.SECONDS) 
      .doOnSubscribe { Log.v("parentObservable", "STARTED") } 
      .subscribe { t: MutableList<Parent> -> 
       Log.v("parentObservable", "onNext") 
       t.forEach { Log.v("onNext", it.toString()) } 
      } 

但是,可觀察是沒有像我預期的那樣在10秒後開始,並且隨着這個數據準備好開始immedietaly。 如何模擬這樣的事情,我會收集2個獨立的流,10秒後我將能夠獲得收集的數據

我必須指出,我不想使用任何主題。

UPDATE

我做somehitng這樣的:

val list1 = listOf(SomeClass1("1", 1), SomeClass1("2", 2), SomeClass1("3", 3)) 
    val list2 = listOf(SomeClass2(5.00), SomeClass2(4.00), SomeClass2(6.00)) 

    val someClass1Observable = Observable 
      .fromIterable(list1) 
      .zipWith(Observable.interval(2, TimeUnit.SECONDS), 
        BiFunction { item: SomeClass1, _: Long -> item }) 



    val someClass2Observable = Observable 
      .fromIterable(list2) 
      .zipWith(Observable.interval(1, TimeUnit.SECONDS), 
        BiFunction { item: SomeClass2, _: Long -> item }) 



    someClass1Observable.subscribe { 
       Log.v("someClass1", it.toString()) 
      } 

    someClass2Observable.subscribe { 
       Log.v("someClass2", it.toString()) 
      } 


    Observable.merge(someClass1Observable, someClass2Observable) 
      .buffer(10, TimeUnit.SECONDS) 
      .delay(10, TimeUnit.SECONDS) 
      .doOnSubscribe { Log.v("parentObservable", "STARTED") } 
      .subscribe { t: MutableList<Parent> -> 
       Log.v("parentObservable", "onNext") 
       t.forEach { Log.v("onNext", it.toString()) } 
      } 

    Thread.sleep(13000) 

    someClass1Observable.subscribe { 
     Log.v("someClass1", it.toString()) 
    } 

    someClass2Observable.subscribe { 
     Log.v("someClass2", it.toString()) 
    } 

在這裏,我想只是模擬同爲合併可觀測someClass1和someclass2觀測量和2個無限流。

同樣,我想有能力合併這兩個流,緩衝區填充數據,並在10秒後做一些事情。如果10秒後這兩個流將再次填充一些數據,則合併Observable應該清除先前的緩衝區,並且應該再次緩衝新數據並在10秒後發射等等,無限。然而,我的代碼並沒有像我期望的那樣工作,我需要做些什麼改變才能使它像我描述的那樣?

回答

1

我認爲你正在尋找的delay操作

http://reactivex.io/documentation/operators/delay.html

延遲 的排放量可觀察着由特定的移動量在時間

所以是這樣的:

.delay(10, TimeUnit.SECONDS) 
+0

感謝您的建議,但它很接近,但不會回答我的問題,請參閱我更新的問題,我做錯了什麼? – Konrad

相關問題