2017-11-18 216 views
1

我想能夠模擬合併兩個單獨的流,它們發出一些對象(即擴展相同的父對象),使用緩衝區操作符緩衝它們並在10秒後發出收集的數據。我希望這種機制是無限的,這種合併/緩衝區總是在來自2個分離的流的排放時被調用。如何模擬發射2 infiite Observable流,並有其他Observable每10秒合併它們並緩衝?

以下是我迄今所做的:

val list1 = mutableListOf<SomeClass1>(
      SomeClass1("1", 1), SomeClass1("2", 2), SomeClass1("3", 3), 
      SomeClass1("4", 4), SomeClass1("5", 5), SomeClass1("6", 6), 
      SomeClass1("7", 7), SomeClass1("8", 8), SomeClass1("9", 9) 
    ) 
    val list2 = mutableListOf<SomeClass2>(
      SomeClass2(1.00), SomeClass2(2.00), SomeClass2(3.00), 
      SomeClass2(4.00), SomeClass2(5.00), SomeClass2(6.00), 
      SomeClass2(7.00), SomeClass2(8.00), SomeClass2(9.00) 
    ) 

    val someClass1Observable = Observable 
      .fromIterable(list1) 
      .zipWith(Observable.interval(2, TimeUnit.SECONDS), 
        BiFunction { item: SomeClass1, _: Long -> item }) 


    val someClass2Observable = Observable 
      .fromIterable(list2) 
      .zipWith(Observable.interval(2, TimeUnit.SECONDS), 
        BiFunction { item: SomeClass2, _: Long -> item }) 


    someClass1Observable.subscribe { 
     Log.v("someClass1", it.toString()) 
    } 

    someClass2Observable.subscribe { 
     Log.v("someClass2", it.toString()) 
    } 


    Observable.merge(someClass1Observable, someClass2Observable) 
      .buffer(10, TimeUnit.SECONDS) 
      .repeat() 
      .doOnSubscribe { Log.v("parentObservable", "STARTED") } 
      .subscribe { t: MutableList<Parent> -> 
       Log.v("parentObservable", "onNext") 
       t.forEach { Log.v("onNext", it.toString()) } 
      } 

    Thread.sleep(30000) 
    Log.v("AFTER_SLEEP", "AFTER_SLEEP") 


    someClass1Observable.subscribe { 
     Log.v("someClass1", it.toString()) 
    } 

    someClass2Observable.subscribe { 
     Log.v("someClass2", it.toString()) 
    } 

2流的第一發射工作正常,合併/緩衝器可觀察到從他們每次10秒後收集的排放。然而,當這兩個流結束排放,我再次訂閱他們,緩衝/合併Observable不是愛好者的工作。如何讓這項工作像無限一樣?有沒有更好的方法來編寫代碼發送對象的2個單獨的流,他們將不需要讀取列表中的值,而不是他們會每隔2秒發出一個新的對象間隔? 如何使合併/緩衝區Observable的工作方式是無限的,我的意思是每當有2個Observable流發生新的輻射時?

回答

0

爲了使這些SomeClass流無限的,你可以簡單地把一個.repeat()運營商對他們在使用前,或者你可以從interval建立在需求的對象:

val someClass1Obs = Observable 
    .interval(2, TimeUnit.SECONDS) 
    .map { SomeClass1("$it", it.toInt()) } // <-- create objects on demand 

我認爲這可以解決您的其他問題太。

相關問題