2015-04-30 17 views
2

我試圖在RxJava(實際上是RxScala)中實現ObserveLatestOn運算符。如何在RxJava(RxScala)中實現observeLatestOn?

當我們有一個快速的生產者和一個慢速的訂閱者時,這個操作符是很有用的,但是訂閱者並不關心消費物品時丟失的物品。

甲大理石圖:

--1---2---3----------5------6--7-8-9------| 
--1=========>3===>---5=======>6======>9==>| 

=字符表示長期運行的工作由訂戶執行的,所述>字符表示工作剛剛完成。作爲使用的典型例子,想象一些需要顯示的數據的生產者,以及作爲訂戶的數據的屏幕渲染器。渲染需要很長時間,但我們不需要渲染屏幕上的每一步,只是最後一個非常好。

在上面的大理石圖中,生產者信號1.訂戶開始處理它,並且花費很長時間。同時,生產者發出2和3,並且在此之後訂戶完成工作。它看到生產者發出的最後一個項目是3,所以它開始處理它。這很快,同時沒有產生新的項目,所以用戶可以休息。然後,5到達,故事以相同的方式繼續。

我花了幾個小時試圖實現這個看似簡單的操作符,但我仍然不滿意。運算符的本質表明它應該是異步的,它應該在不同的調度程序上發送它們的項目,而不是接收它們。但與此同時,當然,我不想讓工作人員佔用線程,而不需要完成任何工作。

這是我想出迄今:

def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = { 
    @volatile var maybeNextItem: Option[Notification[T]] = None 
    @volatile var isWorkScheduled = false 
    val itemsQueueLock = new Object() 

    Observable(subscriber ⇒ { 
    def signalToSubscriber(materializedItem: Notification[T]): Unit = { 
     materializedItem match { 
     case OnNext(item) ⇒ subscriber onNext item 
     case OnError(error) ⇒ subscriber onError error 
     case OnCompleted ⇒ subscriber.onCompleted() 
     } 
    } 

    def queueItem(item: Notification[T]): Unit = { 
     val worker = scheduler.createWorker 

     val shouldScheduleWork = itemsQueueLock synchronized { 
     val result = !isWorkScheduled 
     maybeNextItem = Some(item) 
     isWorkScheduled = true 
     result 
     } 

     if (shouldScheduleWork) { 
     worker.scheduleRec { 
      val maybeNextItemToSignal = itemsQueueLock synchronized { 
      val result = maybeNextItem 
      if (result.isEmpty) { 
       worker.unsubscribe() 
       isWorkScheduled = false 
      } 
      maybeNextItem = None 
      result 
      } 

      maybeNextItemToSignal foreach signalToSubscriber 
     } 
     } 
    } 

    o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
     next ⇒ queueItem(OnNext(next)), 
     error ⇒ queueItem(OnError(error)), 
    () ⇒ queueItem(OnCompleted) 
    ) 
    }) 
} 

看來工作,但我不覺得有信心,有沒有競爭條件和死鎖。另外,我不確定解決方案是否可以變得更簡單。我也一直在思考的另一種方法,如

  • 在某個時間巧妙地利用可觀察到的請求只是一個項目的OperatorDebounceWithSelector
  • 組合,observeOnonBackpressureBuffer(1)

我也不要」不知道如何編寫確定性的單元測試。 scheduleRec預定的工作與TestScheduler一起使用時不能被中斷,我需要使用一個調度程序,它可以在不同的線程上運行。我發現很難爲多線程代碼的競態條件編寫正確的單元測試。

所以,問題依然存在:我的解決方案是否正確?有沒有更簡單,更好或更正確的方法?以及如何測試它的正確性?

回答

1

我建議使用lift來實現這個操作符。這裏是我的解決方案:

package object ObservableEx { 

    implicit class ObserveLatestOn[T](val o: Observable[T]) { 

    def observeLatestOn(scheduler: Scheduler): Observable[T] = { 
     o.lift { (child: Subscriber[T]) => 
     val worker = scheduler.createWorker 
     child.add(worker) 

     val parent = new Subscriber[T] { 

      private val lock = new AnyRef 

      // protected by "lock" 
      private var latest: Notification[T] = null 
      // protected by "lock" 
      // Means no task runs in the worker 
      private var idle = true 

      private var done = false 

      override def onStart(): Unit = { 
      request(Long.MaxValue) 
      } 

      override def onNext(v: T): Unit = { 
      if (!done) { 
       emit(OnNext(v)) 
      } 
      } 

      override def onCompleted(): Unit = { 
      if (!done) { 
       done = true 
       emit(OnCompleted) 
      } 
      } 

      override def onError(e: Throwable): Unit = { 
      if (!done) { 
       done = true 
       emit(OnError(e)) 
      } 
      } 

      def emit(v: Notification[T]): Unit = { 
      var shouldSchedule = false 
      lock.synchronized { 
       latest = v 
       if (idle) { 
       // worker is idle so we should schedule a task 
       shouldSchedule = true 
       // We will schedule a task, so the worker will be busy 
       idle = false 
       } 
      } 
      if (shouldSchedule) { 
       worker.schedule { 
       var n: Notification[T] = null 
       var exit = false 
       while (!exit) { 
        lock.synchronized { 
        if (latest == null) { 
         // No new item arrives and we are leaving the worker, so set "idle" 
         idle = true 
         exit = true 
        } else { 
         n = latest 
         latest = null 
        } 
        } 
        if (!exit) { 
        n.accept(child) 
        } 
       } 
       } 
      } 
      } 
     } 

     child.add(parent) 

     parent 
     } 
    } 
    } 

} 

以及單元測試

import ObservableEx.ObserveLatestOn 

@Test 
def testObserveLatestOn(): Unit = { 
    val scheduler = TestScheduler() 
    val xs = mutable.ArrayBuffer[Long]() 
    var completed = false 
    Observable.interval(100 milliseconds, scheduler).take(10).observeLatestOn(scheduler).subscribe(v => { 
    scheduler.advanceTimeBy(200 milliseconds) 
    xs += v 
    }, 
    e => e.printStackTrace(), 
    () => completed = true 
) 
    scheduler.advanceTimeBy(100 milliseconds) 
    assert(completed === true) 
    assert(xs === List(0, 2, 4, 6, 8)) 
} 
+0

對我來說沒問題。所有的標誌都是本地的,或者在同步部分使用,所以它可能應該是免費的競賽條件。很高興瞭解'lift'和'Notification.accept'。我會用你的版本,謝謝。 –

0

我有一個PR表示操作員onBackpressureLatest()應該有預期的行爲,但你需要的併發性和可使用observeOn如常。

+0

是的,我也在想這樣的事情。我想你必須將它與'observeOn'和一些只需要一次請求一個項目的操作符結合起來實現我正在尋找的行爲?無論如何,我可能會用zsxswing的解決方案,至少在'onBackpressureLatest'在RxScala版本中。 –