我有以下代碼:RxJava:觀察到的,默認的線程
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
而這裏的輸出:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
重要的細節:我打電話從另一個線程subscribe
方法(main
線程在Android中)。
所以看起來像Observable
類是同步的,在默認情況下,它發出事件(s.onNext
)在同一個線程執行的一切(運營商如map
+通知用戶),對不對?我想知道......它是有意的行爲還是我誤解了某些東西?其實我期望至少onNext
和onComplete
回調將在調用者的線程上被調用,而不是在發送事件的那個上。我是否正確理解在這種特殊情況下實際的調用者線程無關緊要?至少在異步生成事件時。
另一個值得關注的 - 如果我收到了一些可觀察到從一些外部源的參數(即我不產生對我自己)......有沒有辦法對我來說它的用戶檢查其是否是同步或異步,我只需明確指定我想通過subscribeOn
和observeOn
方法接收回調的位置,對吧?
謝謝!