我有2個流,第一個流是從數據庫獲取數據並在完成數據後調用onCompleted()
的流。第二個流是從服務器獲取實時數據的流,並且不會調用onCompleted()
。我想要做的是創建一個操作符,如果第一個流(上游)是空流,就可以執行操作。下面是示例:在RxJava中創建doIfEmpty運算符
getItemFromDatabase()
.lift(new DoIfEmptyOperator<Item>(new Action0() {
@Override
public void call() {
//Database is empty
System.out.println("Yeay successfully do an action");
}
}))
.concatWith(getItemFromServer()) // -----> intentionally never complete
.subscribe(new Subscriber<StoryItem>() {
@Override
public void onCompleted() {
//dosomething...
}
@Override
public void onError(Throwable e) {
//dosomething...
}
@Override
public void onNext(StoryItem storyItem) {
//dosomething
}
}));
下面是DoIfEmptyOperator的代碼:
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
public class DoIfEmptyOperator<T> implements Observable.Operator<T,T>{
private Action0 action;
private boolean isEmpty = true;
public DoIfEmptyOperator(Action0 action) {
this.action = action;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> childSubscriber) {
Subscriber<T> parentSubscriber = new Subscriber<T>() {
@Override
public void onCompleted() {
if(isEmpty) {
action.call();
}
childSubscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
childSubscriber.onError(e);
}
@Override
public void onNext(T t) {
isEmpty = false;
childSubscriber.onNext(t);
}
};
childSubscriber.add(parentSubscriber);
return parentSubscriber;
}
}
但是因爲parentSubscriber onCompleted()
在不觸發永遠不會執行的動作,由於下游沒有完成。如果我刪除
.concatWith(getItemFromServer())
,則執行該操作。有關如何解決問題的任何線索?我已經潛入Observable.switchIfEmpty()的源代碼,但仍然不知道它是如何工作的。
啊從來沒有想過這個,它的工作原理,謝謝:) – SalacceoVanz