2016-09-25 35 views
16

我想創造可觀那些如下:RXJava - 做一個可暫停觀察到的(有緩衝和窗口爲例)

  • 緩衝區中的所有項目,而在暫停
  • 立即發出商品,而他們是沒有暫停
  • 暫停/恢復觸發必須來自另一個觀察的
  • 它必須被保存到不主線程上運行的觀測中使用,它必須保存在主線程改變暫停/恢復狀態

我想使用BehaviorSubject<Boolean>作爲觸發器,並將此觸發器綁定到活動的onResume和事件。 (代碼示例附加)

問題

我設置的東西,但預期它不工作。我使用它像以下:

Observable o = ...; 
// Variant 1 
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue()) 
// Variant 2 
// o = o.compose(RXPauser.applyPauser(getPauser())); 
o 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(); 

目前的問題是,變體1應該可以正常工作,但有時,這些事件只是沒有發出 - 閥門不發光時,直到閥門一切工作(可能是一個線程問題......)!解決方案2非常簡單,似乎可行,但我不確定它是否真的更好,我不這麼認爲。實際上,我不知道,爲什麼一個解決方案,有時失敗,所以我不知道,如果解決方案2解決了(目前爲我未知)的問題...

可有人告訴我可能是什麼問題,或者如果簡單的解決方案應該可靠工或者給我一個可靠的解決方案?

代碼

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

RXPauser功能

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser) 
{ 
    return observable -> pauser(observable, pauser); 
} 

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser) 
{ 
    // this observable buffers all items that are emitted while emission is paused 
    Observable<T> sharedSource = source.publish().refCount(); 
    Observable<T> queue = sharedSource 
      .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed)) 
      .flatMap(l -> Observable.from(l)) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t)); 

    // this observable emits all items that are emitted while emission is not paused 
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed)) 
      .switchMap(tObservable -> tObservable) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t)); 

    // combine both observables 
    return queue.mergeWith(window) 
      .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t)); 
} 

活動

public class BaseActivity extends AppCompatActivity { 

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false); 

    public BaseActivity(Bundle savedInstanceState) 
    { 
     super(args); 
     final Class<?> clazz = this.getClass(); 
     pauser 
       .doOnUnsubscribe(() -> { 
        L.d(clazz, "Pauser unsubscribed!"); 
       }) 
       .subscribe(aBoolean -> { 
        L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED")); 
       }); 
    } 

    public PublishSubject<Boolean> getPauser() 
    { 
     return pauser; 
    } 

    @Override 
    protected void onResume() 
    { 
     super.onResume(); 
     pauser.onNext(true); 
    } 

    @Override 
    protected void onPause() 
    { 
     pauser.onNext(false); 
     super.onPause(); 
    } 
} 
+0

人們試圖回答這個問題的,到目前爲止,缺少一個重要的要求,即在這個問題說得很清楚: _」暫停/恢復觸發必須來自另一個可觀察的「_」。 他們不想要一個固定的時間表。 –

回答

3

實際上,你可以使用.buffer()運營商通過它可觀察,確定何時從書停止緩衝,樣品:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10) 
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS)) 
    .subscribe(System.out::println); 

從第5章, '馴服' 序列:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

您可以使用PublishSubject作爲Observable爲您的自定義運算符中的元素提供它。每當你需要開始緩衝時間,通過Observable.defer(() -> createBufferingValve())

2

我做了類似的事情記錄事件創建實例。
對象收集一些事件,並在10秒鐘內一次將它們推送到服務器。

其主要思想是,例如,您擁有類Event

public class Event { 

    public String jsonData; 

    public String getJsonData() { 
     return jsonData; 
    } 

    public Event setJsonData(String jsonData) { 
     this.jsonData = jsonData; 
     return this; 
    } 
} 

你應該爲事件創建隊列:

private PublishSubject<Event> eventQueue = PublishSubject.create(); 

它可以是BehaviorSubject,不要緊

那麼你應該創建邏輯,將處理事件推到服務器:

eventObservable = eventQueue.asObservable() 
      .buffer(10, TimeUnit.SECONDS) //flush events every 10 seconds 
      .toList() 
      .doOnNext(new Action1<List<Event>>() { 
       @Override 
       public void call(List<Event> events) { 
        apiClient.pushEvents(events);  //push your event 
       } 
      }) 
      .onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() { 
       @Override 
       public Observable<List<Event>> call(Throwable throwable) { 
        return null; //make sure, that on error will be never called 
       } 
      }) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(Schedulers.io()); 

然後你應該訂閱它,並保留subsc ription,直到你不需要它:

eventSubscription = eventObservable.subscribe() 

首頁這有助於