2016-09-16 131 views
2

我想實現以下目標:RxJava訂閱阻擋觀察到

String result = myObservable.toBlocking().first(); 

即,它像一個普通的函數調用。然而,這從來不會發生,因爲你需要訂閱它,我不知道該怎麼做。如果我訂閱它,結果將在另一個範圍內,並且代碼非常難看,因爲我只能得到像常規觀察值一樣的結果,所以沒有必要將它變成阻塞的可觀察值。

回答

1

它的實際工作,只要你想:

Observable<String> myObservable = Observable.just("firstValue", "secondValue"); 
    String result = myObservable.toBlocking().first(); 
    System.out.println(result); // ---> "firstValue" 

引擎蓋下,調用BlockingObservable.first()並訂閱爲您提供:

private T blockForSingle(final Observable<? extends T> observable) { 
    final AtomicReference<T> returnItem = new AtomicReference<T>(); 
    final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>(); 
    final CountDownLatch latch = new CountDownLatch(1); 

    @SuppressWarnings("unchecked") 
    Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() { 
     @Override 
     public void onCompleted() { 
      latch.countDown(); 
     } 

     @Override 
     public void onError(final Throwable e) { 
      returnException.set(e); 
      latch.countDown(); 
     } 

     @Override 
     public void onNext(final T item) { 
      returnItem.set(item); 
     } 
    }); 
    BlockingUtils.awaitForComplete(latch, subscription); 

    if (returnException.get() != null) { 
     Exceptions.propagate(returnException.get()); 
    } 

    return returnItem.get(); 
} 

UPDATE:如果沒有任何意義使用一個BehaviourSubjecttoBlocking()。考慮到它是和ObservableObserver所以在某處應該調用myObservable.onNext("value")。如果您通過調用toBlocking()來阻止該線程,除非myObservable在其他調用onNext()的其他線程中可用,否則您將被阻止。

舉例來說,這是正常使用`BehaviourSubject的:

// observer will receive the "one", "two" and "three" events, but not "zero" 
    BehaviorSubject<Object> subject = BehaviorSubject.create("default"); 
    subject.onNext("zero"); 
    subject.onNext("one"); 
    subject.subscribe(observer); 
    subject.onNext("two"); 
    subject.onNext("three"); 
+0

有趣的,我忘了這是一個BehaviorSubject其實..沒有制定有所作爲?它無限期地阻止了我的整個線程。 – breakline