我有一個由幾個互連組件組成的系統。一切都很好,但經過一段時間後,一些觀察者停止接收觀察者的onNext()發送的項目。RxJava - 觀察者不會在onNext()項目後
一個簡化的情況是這樣的:我有 Component1.start()
- >創建一個ConnectableObservable與Observable.create(...).subscribeOn().observeOn().publish()
,並訂閱Component2。之後,它連接()s。這個可觀察對象在一個循環中發射一些項目,然後在完成時調用s.onComplete()
。
Component2實現Observer。另外它有一個ConnectableObservable,它運行一個while(true)循環。當它在其onNext()中獲取值(由Component1調用)時,它會使用自己的ConnectableObservable通知Component0。 (注意我也使用PublishSubject實現了它們,發生的情況也是如此)。
Component1.start() //Creates Component1's ConnectableObservable, subscribes Component2 and starts running with connect();
Component1.connectableObservable -> onNext() ---> Component2
Component2.connectableObservable -> onNext() ---> Component0
當Component0.onNext()獲取特定項目(經過100次迭代),它停止Component1.observable,使其退出了循環,並呼籲的onComplete()。
過了一段時間,Component0調用Component1.start(),並且所有事情都重新開始。
我所看到的是,當一切正常 Component1.observable.onNext()調用rx.internal.operators.OperatorSubscribeOn.......subscriber.onNext()
rx.internal.operators.OperatorSubscribeOn
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
subscriber.onNext()是內部類private static final class ObserveOnSubscriber<T>
並在這裏結束了調用時間表():
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
時間表()是
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
計數器爲0,因此recursiveScheduler.schedule(this);
被調用且Component2獲取該項目。
現在,當它停止工作時會發生的事情是計數器不再是0,實際上每次調用都會增加它。因此,recursiveScheduler.schedule(this);
永遠不會被調用,Component2不會得到任何東西。
這可能是什麼原因造成的?爲什麼計數器0在某個點開始增加?
UPDATE:時間表()被調用後,有計劃的任務調用下面的代碼,減少櫃檯當物品沒有丟失:
在源代碼中,我已經看到下面挖private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
// only execute this from schedule()
@Override
public void call() {
...
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
據此,由於項目被遺漏,計數器增加,並且隨後的項目也被遺漏。
項目被遺漏的原因是什麼?
我注意到一些奇怪的東西。如果我從程序中刪除任何其他(未提及)的可觀察項目,則不會漏掉任何項目。他們將Component0視爲觀察者,並在其自己的subscribeOn()線程中生成它們的項目,所以我看不到它們如何影響此場景。
更新2:我已經把試圖找出發生了什麼。當我做Component1.connectableObservable.connect()
,它最終調用private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 -> init()
這裏的時間表()被調用:
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
正確的做法留下OperatorObserveOn.counter = 0時間表後()。當它不再工作時,scheduler()會將OperatorObserveOn.counter的值增加1。
保羅嗨,我如何能夠創建用於實現觀測器的一類退訂監聽器? – codependent
看看這裏的最後一個測試例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/creating/ObservableSubscription.java – paul