2017-04-18 51 views
0

我有一些我並行執行的observable集合,如localObservablenetworkObservable。如果networkObservable開始發射物品(從此時起,我只需要這些物品),然後丟棄由localObservable發出的物品(也許localObservable尚未開始)。rxjava開關可觀察,如果第二個可觀察的開始發射項目

Observable<Integer> localObservable = 
      Observable.defer(() -> Observable.range(1, 10)).subscribeOn(Schedulers.io()); 
Observable<Integer> networkObservable = 
      Observable.defer(() -> Observable.range(11, 20)).subscribeOn(Schedulers.io()); 

回答

2

你可以做這樣的事情:

Observable<Long> networkObservable = 
      Observable.interval(1000, 500, TimeUnit.MILLISECONDS) 
        .subscribeOn(Schedulers.io()) 
        .share(); 
    Observable<Long> localObservable = 
      Observable.interval(500, TimeUnit.MILLISECONDS)      
        .subscribeOn(Schedulers.io()) 
        .takeUntil(networkObservable); 

    Observable.merge(networkObservable, localObservable) 
      .subscribe(System.out::println); 

這將輸出:

0 // localObservable 
1 // localObservable 
0 // networkObservable from here on 
1 
2 
... 

takeUntil會讓localObservable停止和取消時,從networkObservable第一發射發生,所以合併Observable將從localObservable發出只要networkObservable未啓動ed,當它發生時,它將停止從localObservable發射並切換爲僅從networkObservable發射。

+0

它的工作。感謝你的回答。 – xymelon

+0

如果'networkObservable'發出錯誤,我想繼續使用'localObservable',我該怎麼做? – xymelon

+0

你可以使用mergeDelayError(在這種情況下,你會在localObservable發射一些東西之後得到onError),或者只是用onErrorREsumeNext或類似的方法捕獲networkObservable的所有錯誤 – yosriz

0

有由運營商這樣的一個簡單的解決辦法:AMB

只要看看的System.out的輸出。

文檔:http://reactivex.io/documentation/operators/amb.html

基本上你訂閱觀察到兩者在同一時間和任何可觀察到的第一發射獲得通過。其他觀察對象將取消訂閱。

@Test 
public void ambTest() throws Exception { 
    TestScheduler testScheduler = new TestScheduler(); 

    Observable<Integer> network = Observable.timer(1000, TimeUnit.MILLISECONDS, testScheduler) 
       .concatMap(aLong -> Observable.just(1, 2, 3)) 
       .doOnSubscribe(disposable -> System.out.println("connect network")) 
       .doOnDispose(() -> System.out.println("dispose network")); 

    Observable<Integer> local = Observable.timer(500, TimeUnit.MILLISECONDS, testScheduler) 
       .concatMap(aLong -> Observable.just(4, 5, 6)) 
       .doOnSubscribe(disposable -> System.out.println("connect local")) 
       .doOnDispose(() -> System.out.println("dispose local")); 

    Observable<Integer> integerObservable = Observable.ambArray(network, local); 

    TestObserver<Integer> test = integerObservable.test(); 

    testScheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS); 

    test.assertValues(4, 5, 6); 

    testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS); 

    test.assertValues(4, 5, 6); 
} 
+0

這取決於需求,如果你想要從當網絡可觀測數據尚未發射時,局域可觀測量,則局域網將只選擇具有第一個發射的本地可觀測數據,並且永遠不會切換到網絡可觀測數據 – yosriz