2
我想實現以下目標:RxJava訂閱阻擋觀察到
String result = myObservable.toBlocking().first();
即,它像一個普通的函數調用。然而,這從來不會發生,因爲你需要訂閱它,我不知道該怎麼做。如果我訂閱它,結果將在另一個範圍內,並且代碼非常難看,因爲我只能得到像常規觀察值一樣的結果,所以沒有必要將它變成阻塞的可觀察值。
我想實現以下目標:RxJava訂閱阻擋觀察到
String result = myObservable.toBlocking().first();
即,它像一個普通的函數調用。然而,這從來不會發生,因爲你需要訂閱它,我不知道該怎麼做。如果我訂閱它,結果將在另一個範圍內,並且代碼非常難看,因爲我只能得到像常規觀察值一樣的結果,所以沒有必要將它變成阻塞的可觀察值。
它的實際工作,只要你想:
Observable<String> myObservable = Observable.just("firstValue", "secondValue");
String result = myObservable.toBlocking().first();
System.out.println(result); // ---> "firstValue"
引擎蓋下,調用BlockingObservable.first()
並訂閱爲您提供:
private T blockForSingle(final Observable<? extends T> observable) {
final AtomicReference<T> returnItem = new AtomicReference<T>();
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
@SuppressWarnings("unchecked")
Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(final Throwable e) {
returnException.set(e);
latch.countDown();
}
@Override
public void onNext(final T item) {
returnItem.set(item);
}
});
BlockingUtils.awaitForComplete(latch, subscription);
if (returnException.get() != null) {
Exceptions.propagate(returnException.get());
}
return returnItem.get();
}
UPDATE:如果沒有任何意義使用一個BehaviourSubject
加toBlocking()
。考慮到它是和Observable
和Observer
所以在某處應該調用myObservable.onNext("value")
。如果您通過調用toBlocking()
來阻止該線程,除非myObservable
在其他調用onNext()
的其他線程中可用,否則您將被阻止。
舉例來說,這是正常使用`BehaviourSubject的:
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
有趣的,我忘了這是一個BehaviorSubject其實..沒有制定有所作爲?它無限期地阻止了我的整個線程。 – breakline