2016-06-08 53 views
0

我有一個由幾個互連組件組成的系統。一切都很好,但經過一段時間後,一些觀察者停止接收觀察者的onNext()發送的項目。RxJava - 觀察者不會在onNext()項目後

一個簡化的情況是這樣的:我有 Component1.start() - >創建一個ConnectableObservable與Observable.create(...).subscribeOn().observeOn().publish(),並訂閱Component2。之後,它連接()s。這個可觀察對象在一個循環中發射一些項目,然後在完成時調用s.onComplete()

Component2實現Observer。另外它有一個ConnectableObservable,它運行一個while(true)循環。當它在其onNext()中獲取值(由Component1調用)時,它會使用自己的ConnectableObservable通知Component0。 (注意我也使用PublishSubject實現了它們,發生的情況也是如此)。

Component1.start() //Creates Component1's ConnectableObservable, subscribes Component2 and starts running with connect(); 

Component1.connectableObservable -> onNext() ---> Component2 

Component2.connectableObservable -> onNext() ---> Component0 

當Component0.onNext()獲取特定項目(經過100次迭代),它停止Component1.observable,使其退出了循環,並呼籲的onComplete()。

過了一段時間,Component0調用Component1.start(),並且所有事情都重新開始。

我所看到的是,當一切正常 Component1.observable.onNext()調用rx.internal.operators.OperatorSubscribeOn.......subscriber.onNext()

rx.internal.operators.OperatorSubscribeOn 

@Override 
public void call(final Subscriber<? super T> subscriber) { 
    final Worker inner = scheduler.createWorker(); 
    subscriber.add(inner); 

    inner.schedule(new Action0() { 
     @Override 
     public void call() { 
      final Thread t = Thread.currentThread(); 

      Subscriber<T> s = new Subscriber<T>(subscriber) { 
       @Override 
       public void onNext(T t) { 
        subscriber.onNext(t); 

subscriber.onNext()是內部類private static final class ObserveOnSubscriber<T>並在這裏結束了調用時間表():

@Override 
public void onNext(final T t) { 
    if (isUnsubscribed() || finished) { 
     return; 
    } 
    if (!queue.offer(on.next(t))) { 
     onError(new MissingBackpressureException()); 
     return; 
    } 
    schedule(); 
} 

時間表()是

protected void schedule() { 
    if (counter.getAndIncrement() == 0) { 
     recursiveScheduler.schedule(this); 
    } 
} 

計數器爲0,因此recursiveScheduler.schedule(this);被調用且Component2獲取該項目。

現在,當它停止工作時會發生的事情是計數器不再是0,實際上每次調用都會增加它。因此,recursiveScheduler.schedule(this);永遠不會被調用,Component2不會得到任何東西。

這可能是什麼原因造成的?爲什麼計數器0在某個點開始增加?

UPDATE:時間表()被調用後,有計劃的任務調用下面的代碼,減少櫃檯當物品沒有丟失:

在源代碼中,我已經看到下面挖
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { 

    // only execute this from schedule() 
    @Override 
    public void call() { 
     ... 
      emitted = currentEmission; 
      missed = counter.addAndGet(-missed); 
      if (missed == 0L) { 
       break; 
      } 

據此,由於項目被遺漏,計數器增加,並且隨後的項目也被遺漏。

項目被遺漏的原因是什麼?

我注意到一些奇怪的東西。如果我從程序中刪除任何其他(未提及)的可觀察項目,則不會漏掉任何項目。他們將Component0視爲觀察者,並在其自己的subscribeOn()線程中生成它們的項目,所以我看不到它們如何影響此場景。

更新2:我已經把試圖找出發生了什麼。當我做Component1.connectableObservable.connect(),它最終調用private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 -> init()

這裏的時間表()被調用:

void init() { 
      // don't want this code in the constructor because `this` can escape through the 
      // setProducer call 
      Subscriber<? super T> localChild = child; 

      localChild.setProducer(new Producer() { 

       @Override 
       public void request(long n) { 
        if (n > 0L) { 
         BackpressureUtils.getAndAddRequest(requested, n); 
         schedule(); 

正確的做法留下OperatorObserveOn.counter = 0時間表後()。當它不再工作時,scheduler()會將OperatorObserveOn.counter的值增加1。

回答

0

獲取每一個訂閱觀察者和創建一個監聽器告訴你什麼時候從可觀察一個觀察者退訂,那麼你就可以理解爲什麼這種情況正在發生。

無論如何,我會看看繼電器,因爲你不必取消訂閱你的observable它更安全,你可以確定它永遠不會停止發射事件。

看一看這個例子。

https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java

+0

保羅嗨,我如何能夠創建用於實現觀測器的一類退訂監聽器? – codependent

+0

看看這裏的最後一個測試例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java – paul