我有一個使用RxJava observables設置的事件序列。基本上我合併了Observable.just(Events.*)
創建的不同延遲與observable.delay(time, timeUnit, scheduler)
函數設置的不同延遲。然後我將它們發佈到PublishSubject
(events
,代碼如下),並訂閱該PublishSubject
以觀察序列(observeEvents()
函數在下面的代碼中)。它曾經工作得很好,但最近我在我的設備上看到了一個非常奇怪的行爲(OnePlus One with android 5.0.2)(並且沒有在模擬器上看到它)。基本上,事件會混淆不清,延遲較高的事件可能會在延遲較小的事件之前發生,延遲較小的事件可能會在隊列末尾發生,有時所有事件都可能按正確的順序排列。前3場比賽經常混合在一起。有時候根本沒有觀察到一些事件。這裏會發生什麼?RxJava延遲可觀察到的火災意外延遲
的代碼在科特林:
var computationScheduler = Schedulers.computation()
private val events: PublishSubject<Events> = PublishSubject.create()
private val userActionSubject: PublishSubject<Events> = PublishSubject.create()
Observable.merge(
event0(),
event1(),
event2(),
userActionOrEvent3(),
userActionOrEvent4())
.subscribe({
// Weird timings are observed here already
events.onNext(it)
}, { e ->
events.onError(e)
}))
private fun userActionOrEvent4(): Observable<Events> {
return Observable.amb(Observable.just(Events.Event4)
.delay(12800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
.take(1)
}
private fun userActionOrEvent3(): Observable<Events> {
return Observable.amb(Observable.just(Events.Event3)
.delay(2800, TimeUnit.MILLISECONDS, computationScheduler), userActionSubject.asObservable().subscribeOn(computationScheduler))
.take(1)
}
private fun event2() = Observable.just(Events.Event2)
.delay(1800, TimeUnit.MILLISECONDS, computationScheduler)
private fun event1() = Observable.just(Events.Event1)
.delay(200, TimeUnit.MILLISECONDS, computationScheduler)
private fun event0() = Observable.just(Events.Event0)
.subscribeOn(computationScheduler)
open fun observeEvents(): Observable<Events> = events.asObservable().observeOn(AndroidSchedulers.mainThread())
open fun onUserAction() {
userActionSubject.onNext(Events.Action)
}