我是rxjava的新手,我有以下問題:供給Observable的值
對象被外部系統不規則地放入FIFO隊列中。我需要一個每秒運行的Observable,從隊列中取出一個項目(如果有的話)併發送給訂閱者。
兩個問題:
隊列中的項目中產生,同時可觀察是活的,它是不可能提供前期的所有項目。隊列可能爲空,在這種情況下,Observable必須處於待機狀態並且不會發射任何東西。 (如果Observable會在暫停後的隊列中有一個項目可用時立即啓動,但如果我們不想更頻繁地輪詢,那麼該隊列可能也需要是Observable想法如何。)
外部系統必須有可能完成Observable。我可以設置一個變量並從Observable中讀取它,但是我想知道是否有更好的方法來實現這一點。
LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue boolean stopObservable = false; // the variable to stop the observable Observable.create(new Observable.OnSubscribe<Layer>() { @Override public void call(Subscriber<? super Layer> subscriber) { try { if (!queue.isEmpty()) { Layer layer = queue.poll(); subscriber.onNext(layer); } else { if (stopObservable) { subscriber.onCompleted(); } } } catch (Exception e) { subscriber.onError(e); } } }).somethingThatCreatesTheInterval().subscribeOnEtc.
對於間隔,我不能使用。樣品(),因爲它下降的項目,所有的項目都發射是非常重要的。
.throttleWithTimeout()看起來更好,但它似乎也放棄了項目。
rx非常酷,但很難進入。任何輸入讚賞。
所以,你不介意,如果輸入項目的隊列隨時間建立起來 - 你只是想發出一個每秒(或者乾脆跳過「插槽「如果輸入隊列中沒有可用項目)?我的第一本能是查看定時器(提供「脈衝」)和映射(它不映射任何東西,但只是丟棄定時器發出的每個Long,而是從輸入隊列發出下一個項目 - 或者調用onCompleted if停止變量設置爲true)。但也許有一個更優雅的替代方案... – 2014-09-28 09:20:51
其實我想你可能需要在第二步中使用flatMap(而不是map) - 以便能夠處理輸入隊列爲空的情況。所以你要麼發出一個Observable.just()或者Observable.empty()。 – 2014-09-28 09:28:56