所以我有這樣的代碼:RxJava鏈式觀測量和NetworkMainThreadException
public Observable<AbstractXMPPConnection> connect(final AbstractXMPPConnection connection) {
return Observable.<AbstractXMPPConnection>create(subscriber -> {
try {
AbstractXMPPConnection connection2 = connection.connect();
if (connection2.isConnected()) {
subscriber.onNext(connection2);
subscriber.onCompleted();
}
} catch (SmackException | IOException | XMPPException e) {
e.printStackTrace();
subscriber.onError(e);
}
})
.doOnError(throwable -> LOGI("111", "Connection OnError called"));
}
public Observable<AbstractXMPPConnection> connectWithRetry(final AbstractXMPPConnection connection) {
return connect(connection)
.retryWhen(attempts -> attempts.zipWith(Observable.range(1, MAX_CONNECTION_TRIES), (throwable, integer) -> new Pair<>(throwable, integer))
.flatMap(pair -> {
if (pair.second == MAX_LOGIN_TRIES)
return Observable.error(pair.first);
return Observable.timer(pair.second, TimeUnit.SECONDS);
}));
}
public void connect() {
assertTrue("To start a connection to the server, you must first call init() method!",
this.connectionConfig != null);
connectionHelper.connectWithRetry(connection)
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<AbstractXMPPConnection>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
LOGI(TAG, "ConnectionHelper Connection onError\n");
/**{@link LoginActivity#onConnectionFailure(OnConnectionFailureEvent)} */
MainApplication.getInstance().getBusInstance().post(new OnConnectionFailureEvent());
}
@Override
public void onNext(AbstractXMPPConnection connection) {
LOGI(TAG, "ConnectionHelper Connection onNext");
// onConnected();
}
});
}
我有鏈接可觀察的一些問題。想象這種場景,其中我有一個連接Observable,有時候我使用它,但我主要使用的是可觀察的connectWithRetry()
。
我的問題是,如果添加了此會發生什麼:
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
到connect()
和connectWithRetry()
兩者兼而有之?在這種情況下,當我撥打 public void connect並指定一個調度程序時,以前的那些會被忽略?
而我爲什麼得到NetworkOnMainThreadException
?明確的observeOn(Schedulers.newThread())
在那裏,它不應該給我那個錯誤
好吧,所以考慮到我爲每個observable指定了兩個調度器,而第三個調度器是鏈接的,這是錯誤的嗎?或者我應該只是簡單地指定調度器只在公共無效連接?虐待接受你的答案,謝謝你的幫助 –
我已經用一些例子更新了答案 - 這真的取決於你放在哪裏!不過,我不確定你對第三個人的意思。 –
好的感謝額外的信息,我欣賞它:P我的意見,名字很差的選擇。我的意思是,Observable,觀察員,Subscriber實施Observer。它就像subscribeOn是主要的問題,因爲考慮當你訂閱一個observable時,你指定了你想要對結果做什麼,subscribeOn誤導我們「在你想用結果做什麼線程中」。 Imho –