2016-10-21 80 views
2

訂閱取消訂閱時,執行某些邏輯時遇到了一些麻煩。我已經呆了好幾個小時了,到目前爲止我的進展很小。這是我的代碼的簡化版本:取消訂閱時清除資源

public class Command<E> { 

    public CommandActionObservable execute() { 
     final CommandAction<E> command = createCommand(); 

     final OnSubscribe<CommandAction<E>> onSubscribe = (subscriber) -> { 

      /* Create a listener that handles notifications and register it. 
      * The idea here is to push the command downstream so it can be re-executed 
      */ 
      final Listener listener = (event) -> { 
       subscriber.onNext(command); 
      } 
      registerListener(listener); 

      /* This is where I'm having trouble. The unregister method 
      * should be executed when the subscriber unsubscribed, 
      * but it never happens 
      */ 
      subscriber.add(Subscriptions.create(() -> { 
       unregisterListener(listener); 
      })); 

      // pass the initial command downstream 
      subscriber.onNext(command); 

      kickOffBackgroundAction();    
     } 

     final Observable<CommandAction<E>> actionObservable = Observable.create(onSubscribe) 
      .onBackpressureLatest() 
      .observeOn(Shedulers.io()) 
      .onBackpressureLatest(); 
     return new CommandActionObservable((subscriber) -> { 
      actionObservable.unsafeSubscribe(subscriber); 
     }) 
    } 

    public class CommandActionObservable extends Observable<CommandAction<E> { 

     // default constructor omitted 

     public Observable<E> toResult() { 
      return lift((Operator) (subscriber) -> { 
       return new Subscriber<CommandAction<E>>() { 
        // delegate onCompleted and onError to subscriber 

        public void onNext(CommandAction<E> action) { 
         // execute the action and pass the result downstream 
         final E result = action.execute(); 
         subscriber.onNext(result) 
        } 
       } 
      } 
     } 

    } 

} 

我使用Command以通常的方式,將所產生的訂製了CompositeSubscriptiononDestroy()從中退訂。下面是一個例子:

final Observable<SomeType> obs = new Command<SomeType>() 
             .execute() 
             .toResult(); 
subscription.add(obs.subscribe(// impl here)); 



public void onDestroy() { 
    super.onDestroy(); 
    subscription.unsubscribe(); 
} 

如上所述,我不能得到的退訂邏輯工作,並註銷監聽器,這會導致內存泄漏的應用程序。如果我在obs上調用doOnUnsubscribe(),它會被調用,所以我沒有正確標記,但是可能的觀察對象的嵌套和提升會導致一些問題。

我很樂意就此發表意見。

回答

1

原來,這比我預想的要容易得多。

經過一番挖掘,我能夠自己想出答案。只是發佈這可能會最終在我的相同情況下的人。

所以,正如我在我的問題中提到的,如果我向觀察值添加doOnSubscribe()動作,我得到了我的Activity,它會得到通知。接下來,我嘗試在​​方法中創建的內部Observables上添加相同的操作。他們沒有接到電話。所以,我得出這樣一個結論:連鎖在我的活動中的可觀察性與我在​​中創建的觀察結果之間的某個地方已經破裂。

唯一發生在流中的是我在toResult()中實現的自定義Operator的應用。 Google搜索後,我遇到了這篇優秀的文章 - Pitfalls of Operator Implementation。我確實在我的操作員中制動了鏈條,上游的可觀察者未被通知取消訂閱。

當我做了作者的建議後,一切都很好。這是我需要做的事情:

lift((Operator) (subscriber) -> { 
    // connect the upstream and downstream subscribers to keep the chain intact 
    new Subscriber<CommandAction<E>>(subscriber) { 
     // the implementation is the same 
    } 
}