2014-09-28 68 views
0

我是rxjava的新手,我有以下問題:供給Observable的值

對象被外部系統不規則地放入FIFO隊列中。我需要一個每秒運行的Observable,從隊列中取出一個項目(如果有的話)併發送給訂閱者。

兩個問題:

  • 隊列中的項目中產生,同時可觀察是活的,它是不可能提供前期的所有項目。隊列可能爲空,在這種情況下,Observable必須處於待機狀態並且不會發射任何東西。 (如果Observable會在暫停後的隊列中有一個項目可用時立即啓動,但如果我們不想更頻繁地輪詢,那麼該隊列可能也需要是Observable想法如何。)

  • 外部系統必須有可能完成Observable。我可以設置一個變量並從Observable中讀取它,但是我想知道是否有更好的方法來實現這一點。

    LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue 
    boolean stopObservable = false; // the variable to stop the observable 
    
    Observable.create(new Observable.OnSubscribe<Layer>() { 
    
        @Override public void call(Subscriber<? super Layer> subscriber) { 
         try { 
          if (!queue.isEmpty()) { 
           Layer layer = queue.poll(); 
           subscriber.onNext(layer); 
          } else { 
           if (stopObservable) { subscriber.onCompleted(); } 
          } 
         } catch (Exception e) { 
          subscriber.onError(e); 
         } 
        } 
    
    }).somethingThatCreatesTheInterval().subscribeOnEtc. 
    

對於間隔,我不能使用。樣品(),因爲它下降的項目,所有的項目都發射是非常重要的。

.throttleWithTimeout()看起來更好,但它似乎也放棄了項目。

rx非常酷,但很難進入。任何輸入讚賞。

+0

所以,你不介意,如果輸入項目的隊列隨時間建立起來 - 你只是想發出一個每秒(或者乾脆跳過「插槽「如果輸入隊列中沒有可用項目)?我的第一本能是查看定時器(提供「脈衝」)和映射(它不映射任何東西,但只是丟棄定時器發出的每個Long,而是從輸入隊列發出下一個項目 - 或者調用onCompleted if停止變量設置爲true)。但也許有一個更優雅的替代方案... – 2014-09-28 09:20:51

+0

其實我想你可能需要在第二步中使用flatMap(而不是map) - 以便能夠處理輸入隊列爲空的情況。所以你要麼發出一個Observable.just()或者Observable.empty()。 – 2014-09-28 09:28:56

回答

1

當我需要定期輪詢外部Web服務時,我做了類似的事情。

  1. 對於你可以用timer出發的時間間隔;在與1組成的粒度可觀測鏈將輪詢,也許接一層,如果該層是空每個刻度則沒有發出

    Observable.timer(0, 1, TimeUnit.SECOND) 
        .flatMap(tick -> Observable.just(queue.poll()).filter(layer -> layer != null)) 
        .subscribe(layer -> System.out.format("The layer is : %s", layer)); 
    
  2. 現在如果你想終止整個您可以添加takeUntil鏈。所以,當你的外部系統要停止將提交在stopObservable東西,將停止後續訂閱:

    // somewhere before 
    PublishSubject stopNotifier = PublishSubject.create(); 
    
    // somewhere process the queue 
    Observable.timer(0, 1, TimeUnit.SECOND) 
        .takeUntil(stopNotifier) 
        .flatMap(tick -> Observable.just(queue.poll())) 
        .subscribe(layer -> System.out.format("The layer is : %s", layer)); 
    
    // when not anymore interested (calling onComplete works too) 
    stopNotifier.onNext("cancel everything about the queue"); 
    

我正在寫從平板電腦這種反應,所以你可能會認爲我可能有一些拼錯的單詞或做出天真的編程錯誤;)

0

如果可能,您應該使用PublishSubject<Layer>而不是LinkedList<Layer>。然後,外部系統可以通過調用publishSubject.onNext來提供新項目,並且由於PublishSubjectObservable的子類,因此您的系統可以將其視爲Observable,並且根據您希望的時間的語義,將這些運算符之一應用於它:

  • sample
  • debounce
  • throttleFirst/throttleLast/throttleWithTimeout
  • .zipWith(Observable.timer(1, TimeUnit.SECONDS), (value, tick) -> value)(!可能做了很多緩衝)
  • 根本沒有時間修改(認爲這是很好)
相關問題