訂閱取消訂閱時,執行某些邏輯時遇到了一些麻煩。我已經呆了好幾個小時了,到目前爲止我的進展很小。這是我的代碼的簡化版本:取消訂閱時清除資源
public class Command<E> {
public CommandActionObservable execute() {
final CommandAction<E> command = createCommand();
final OnSubscribe<CommandAction<E>> onSubscribe = (subscriber) -> {
/* Create a listener that handles notifications and register it.
* The idea here is to push the command downstream so it can be re-executed
*/
final Listener listener = (event) -> {
subscriber.onNext(command);
}
registerListener(listener);
/* This is where I'm having trouble. The unregister method
* should be executed when the subscriber unsubscribed,
* but it never happens
*/
subscriber.add(Subscriptions.create(() -> {
unregisterListener(listener);
}));
// pass the initial command downstream
subscriber.onNext(command);
kickOffBackgroundAction();
}
final Observable<CommandAction<E>> actionObservable = Observable.create(onSubscribe)
.onBackpressureLatest()
.observeOn(Shedulers.io())
.onBackpressureLatest();
return new CommandActionObservable((subscriber) -> {
actionObservable.unsafeSubscribe(subscriber);
})
}
public class CommandActionObservable extends Observable<CommandAction<E> {
// default constructor omitted
public Observable<E> toResult() {
return lift((Operator) (subscriber) -> {
return new Subscriber<CommandAction<E>>() {
// delegate onCompleted and onError to subscriber
public void onNext(CommandAction<E> action) {
// execute the action and pass the result downstream
final E result = action.execute();
subscriber.onNext(result)
}
}
}
}
}
}
我使用Command
以通常的方式,將所產生的訂製了CompositeSubscription
和onDestroy()
從中退訂。下面是一個例子:
final Observable<SomeType> obs = new Command<SomeType>()
.execute()
.toResult();
subscription.add(obs.subscribe(// impl here));
public void onDestroy() {
super.onDestroy();
subscription.unsubscribe();
}
如上所述,我不能得到的退訂邏輯工作,並註銷監聽器,這會導致內存泄漏的應用程序。如果我在obs
上調用doOnUnsubscribe()
,它會被調用,所以我沒有正確標記,但是可能的觀察對象的嵌套和提升會導致一些問題。
我很樂意就此發表意見。