我想創造可觀那些如下:RXJava - 做一個可暫停觀察到的(有緩衝和窗口爲例)
- 緩衝區中的所有項目,而在暫停
- 立即發出商品,而他們是沒有暫停
- 暫停/恢復觸發必須來自另一個觀察的
- 它必須被保存到不主線程上運行的觀測中使用,它必須保存在主線程改變暫停/恢復狀態
我想使用BehaviorSubject<Boolean>
作爲觸發器,並將此觸發器綁定到活動的onResume
和事件。 (代碼示例附加)
問題
我設置的東西,但預期它不工作。我使用它像以下:
Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
目前的問題是,變體1應該可以正常工作,但有時,這些事件只是沒有發出 - 閥門不發光時,直到閥門一切工作(可能是一個線程問題......)!解決方案2非常簡單,似乎可行,但我不確定它是否真的更好,我不這麼認爲。實際上,我不知道,爲什麼一個解決方案,有時失敗,所以我不知道,如果解決方案2解決了(目前爲我未知)的問題...
可有人告訴我可能是什麼問題,或者如果簡單的解決方案應該可靠工或者給我一個可靠的解決方案?
代碼
RxValue
https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08
RXPauser功能
public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
return observable -> pauser(observable, pauser);
}
private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
// this observable buffers all items that are emitted while emission is paused
Observable<T> sharedSource = source.publish().refCount();
Observable<T> queue = sharedSource
.buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
.flatMap(l -> Observable.from(l))
.doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));
// this observable emits all items that are emitted while emission is not paused
Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
.switchMap(tObservable -> tObservable)
.doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));
// combine both observables
return queue.mergeWith(window)
.doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}
活動
public class BaseActivity extends AppCompatActivity {
private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);
public BaseActivity(Bundle savedInstanceState)
{
super(args);
final Class<?> clazz = this.getClass();
pauser
.doOnUnsubscribe(() -> {
L.d(clazz, "Pauser unsubscribed!");
})
.subscribe(aBoolean -> {
L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
});
}
public PublishSubject<Boolean> getPauser()
{
return pauser;
}
@Override
protected void onResume()
{
super.onResume();
pauser.onNext(true);
}
@Override
protected void onPause()
{
pauser.onNext(false);
super.onPause();
}
}
人們試圖回答這個問題的,到目前爲止,缺少一個重要的要求,即在這個問題說得很清楚: _」暫停/恢復觸發必須來自另一個可觀察的「_」。 他們不想要一個固定的時間表。 –