我試圖在RxJava(實際上是RxScala)中實現ObserveLatestOn運算符。如何在RxJava(RxScala)中實現observeLatestOn?
當我們有一個快速的生產者和一個慢速的訂閱者時,這個操作符是很有用的,但是訂閱者並不關心消費物品時丟失的物品。
甲大理石圖:
--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|
的=
字符表示長期運行的工作由訂戶執行的,所述>
字符表示工作剛剛完成。作爲使用的典型例子,想象一些需要顯示的數據的生產者,以及作爲訂戶的數據的屏幕渲染器。渲染需要很長時間,但我們不需要渲染屏幕上的每一步,只是最後一個非常好。
在上面的大理石圖中,生產者信號1.訂戶開始處理它,並且花費很長時間。同時,生產者發出2和3,並且在此之後訂戶完成工作。它看到生產者發出的最後一個項目是3,所以它開始處理它。這很快,同時沒有產生新的項目,所以用戶可以休息。然後,5到達,故事以相同的方式繼續。
我花了幾個小時試圖實現這個看似簡單的操作符,但我仍然不滿意。運算符的本質表明它應該是異步的,它應該在不同的調度程序上發送它們的項目,而不是接收它們。但與此同時,當然,我不想讓工作人員佔用線程,而不需要完成任何工作。
這是我想出迄今:
def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
@volatile var maybeNextItem: Option[Notification[T]] = None
@volatile var isWorkScheduled = false
val itemsQueueLock = new Object()
Observable(subscriber ⇒ {
def signalToSubscriber(materializedItem: Notification[T]): Unit = {
materializedItem match {
case OnNext(item) ⇒ subscriber onNext item
case OnError(error) ⇒ subscriber onError error
case OnCompleted ⇒ subscriber.onCompleted()
}
}
def queueItem(item: Notification[T]): Unit = {
val worker = scheduler.createWorker
val shouldScheduleWork = itemsQueueLock synchronized {
val result = !isWorkScheduled
maybeNextItem = Some(item)
isWorkScheduled = true
result
}
if (shouldScheduleWork) {
worker.scheduleRec {
val maybeNextItemToSignal = itemsQueueLock synchronized {
val result = maybeNextItem
if (result.isEmpty) {
worker.unsubscribe()
isWorkScheduled = false
}
maybeNextItem = None
result
}
maybeNextItemToSignal foreach signalToSubscriber
}
}
}
o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
next ⇒ queueItem(OnNext(next)),
error ⇒ queueItem(OnError(error)),
() ⇒ queueItem(OnCompleted)
)
})
}
看來工作,但我不覺得有信心,有沒有競爭條件和死鎖。另外,我不確定解決方案是否可以變得更簡單。我也一直在思考的另一種方法,如
- 在某個時間巧妙地利用可觀察到的請求只是一個項目的
OperatorDebounceWithSelector
- 組合,
observeOn
和onBackpressureBuffer(1)
我也不要」不知道如何編寫確定性的單元測試。 scheduleRec
預定的工作與TestScheduler
一起使用時不能被中斷,我需要使用一個調度程序,它可以在不同的線程上運行。我發現很難爲多線程代碼的競態條件編寫正確的單元測試。
所以,問題依然存在:我的解決方案是否正確?有沒有更簡單,更好或更正確的方法?以及如何測試它的正確性?
對我來說沒問題。所有的標誌都是本地的,或者在同步部分使用,所以它可能應該是免費的競賽條件。很高興瞭解'lift'和'Notification.accept'。我會用你的版本,謝謝。 –