2016-05-03 71 views
7

我試圖獲取給定Observable的最新值,並在調用它後立即發出 。考慮下面的代碼爲例:獲取Observable的最新值並立即發出

return Observable.just(myObservable.last()) 
    .flatMap(myObservable1 -> { 
     return myObservable1; 
    }) 
    .map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object 

這並不是因爲這樣做的flatMap會發出myObservable1這反過來將有 發出到達map工作。 我不知道是否可以做這樣的事情。有沒有人對如何實現這一目標有任何線索?謝謝

+0

觀察到什麼,你說什麼?最新是什麼意思? – akarnokd

+0

'myObservable'是熱可觀察到的,其發射例如:「1」,「2」,「3」以不規則的間隔。我想要做的是能在時間去'myObservable'最新值(「3」在我們的情況下)我到了'return'指令 –

+0

是myObservable過熱或過冷? –

回答

17

last()方法在這裏沒有任何幫助,因爲它等待Observable終止給你發出最後一個項目。

假設您沒有對發射可觀測量的控制,您可以簡單地創建一個BehaviorSubject並將其訂閱到發出您想要偵聽的數據的可觀察事件,然後訂閱創建的主題。由於Subject既是Observable也是Subscriber你會得到你想要的。

我認爲(現在沒有時間檢查它),您可能必須手動取消訂閱原始的可觀察項,如BehaviorSubject一旦其所有訂閱者取消訂閱都不會自動取消訂閱。

事情是這樣的:

BehaviorSubject subject = new BehaviorSubject(); 
hotObservable.subscribe(subject); 
subject.subscribe(thing -> { 
    // Here just after subscribing 
    // you will receive the last emitted item, if there was any. 
    // You can also always supply the first item to the behavior subject 
}); 

http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html

0

在RxJava,subscriber.onXXX被稱爲asynchronous.It意味着,如果在新線程您可觀察EMIT的項目,你永遠無法得到前的最後一個項目回報,除非你阻塞線程和等待item.But如果可觀察EMIT項目同步,你不要」改變它的線程通過subscribeOn和observOn,作爲代碼, :

Observable.just(1,2,3).subscribe(); 

在這種情況下,你可以做這樣得到最後一個項目:

Integer getLast(Observable<Integer> o){ 
    final int[] ret = new int[1]; 
    Observable.last().subscribe(i -> ret[0] = i); 
    return ret[0]; 
} 

這是一個壞主意做這樣this.RxJava喜歡你的話是做異步工作。

+1

都不起作用 – ndori

0

實際上你想在這裏實現的是採取異步任務並將其轉換爲同步任務。

有幾種方法來實現它,每一個與它的優點和缺點:

  • 使用toBlocking() - 這意味着該線程將被阻塞,直到流結束,爲了得到只有一個物品只需使用first(),因爲一旦物品交付,它就會完成。 假設你的整個流是Observable<T> getData(); 那麼就會立即獲得最後的值的方法如下所示:

public T getLastItem(){ return getData().toBlocking().first(); }

請不要使用last(),因爲它會等待流完成然後纔會發出最後一個項目。

如果你的流是一個網絡請求,但它還沒有得到任何項目,這將阻止你的線程,所以只有當你確定有一個項目立即可用(或者如果你真的想要一個塊...)

  • 另一種選擇是簡單地緩存最後的結果,這樣的事情:

    的getData()認購(T-> cachedT = T;)//某處代碼它會繼續保存的最後一個項目交付 公共牛逼getLastItem(){ 回報cachedT; }

,如果沒有通過,你要求它,你會得到空或任何初始值,你已經設置的時間發送的任何項目。 這種計算策略的問題是,認購階段可能的get發生後,如果在2級不同的線程使用可能使競爭條件。

相關問題