2014-12-04 19 views
1

想象一下,您發送事件的訂閱者管道,並且它會一個接一個訪問另一個訂閱者。主題以特定的順序向用戶發送事件並帶背壓

擁有PublishSubject和x個訂閱者/觀察者。通常,事件以特定順序發送給觀察者,但同時不管觀察者何時返回。是否有可能做到這一點流量:

  1. 發射事件osbserverA返回後observerA
  2. ,發出事件observerB observerB返回後
  3. ,發出事件observerC

我m使用RxScalaMonifu Rx實現

Monifu甚至有一個背壓執行:

def onNext(elem: T): Future[Ack] 

我想看「並且結果是:已更改!「此樣品中被打印出來:

val subject = PublishSubject[Int]() 

    var result = "Not Changed" 
    subject.subscribe { i => 
    Observable.timerOneTime(3.seconds, Continue).asFuture.map { x => 
     result = "Changed !!" 
     x.get 
    } 
    } 

    subject.subscribe { i => 
    Observable.timerOneTime(1.seconds, Continue).asFuture.map { x => 
     println("And Result was : " + result) 
     x.get 
    } 
    } 

    subject.onNext(1) 

是在RxScala/RxJava或Monifu有可能不延長主題和壓倒一切的onNext實現這些類被聲明爲final反正所以這將是相當黑客

回答

0

?我認爲答案是一個自定義的主題實現,這樣的事情在Monifu,將飼料觀察員在flatMap方式(忽略的事實是PublishSubject是一個final類):

class PipeSubject extends PublishSubject[RxEvent] { 
    override def onNext(elem: RxEvent): Future[Ack] = { 
    if (!isCompleted) { 
     val observers = subscriptions 
     if (observers.nonEmpty) 
     pipeThroughMany(observers, elem) 
     else 
     Continue 
    } 
    else 
     Cancel 
    } 

private[this] def pipeThroughMany(array: Array[Observer[T]], elem: T): Future[Continue] = { 
    val length = array.length 
    def >>>(idx: Int = 0): Future[Continue] = { 
     val obs = array(idx) 
     obs.onNext(elem).flatMap { 
     case Continue => 
      if (idx+1 < length) 
       >>>(idx+1) 
      else 
      Continue 
     case _ => 
      removeSubscription(obs) 
      Continue 
     } 
    } 
    >>>() 
    } 
} 
相關問題