2015-07-03 81 views
3

所以我有這樣的代碼:RxJava鏈式觀測量和NetworkMainThreadException

public Observable<AbstractXMPPConnection> connect(final AbstractXMPPConnection connection) { 
    return Observable.<AbstractXMPPConnection>create(subscriber -> { 
     try { 
      AbstractXMPPConnection connection2 = connection.connect(); 
      if (connection2.isConnected()) { 
       subscriber.onNext(connection2); 
       subscriber.onCompleted(); 
      } 
     } catch (SmackException | IOException | XMPPException e) { 
      e.printStackTrace(); 
      subscriber.onError(e); 
     } 
    }) 
    .doOnError(throwable -> LOGI("111", "Connection OnError called")); 
} 


public Observable<AbstractXMPPConnection> connectWithRetry(final AbstractXMPPConnection connection) { 
     return connect(connection) 
       .retryWhen(attempts -> attempts.zipWith(Observable.range(1, MAX_CONNECTION_TRIES), (throwable, integer) -> new Pair<>(throwable, integer)) 
         .flatMap(pair -> { 
          if (pair.second == MAX_LOGIN_TRIES) 
           return Observable.error(pair.first); 
          return Observable.timer(pair.second, TimeUnit.SECONDS); 
         })); 
    } 


public void connect() { 
     assertTrue("To start a connection to the server, you must first call init() method!", 
       this.connectionConfig != null); 

     connectionHelper.connectWithRetry(connection) 
       .observeOn(Schedulers.newThread()) 
       .subscribeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Subscriber<AbstractXMPPConnection>() { 
        @Override 
        public void onCompleted() { 
        } 

        @Override 
        public void onError(Throwable e) { 
         LOGI(TAG, "ConnectionHelper Connection onError\n"); 

         /**{@link LoginActivity#onConnectionFailure(OnConnectionFailureEvent)} */ 
         MainApplication.getInstance().getBusInstance().post(new OnConnectionFailureEvent()); 
        } 

        @Override 
        public void onNext(AbstractXMPPConnection connection) { 
         LOGI(TAG, "ConnectionHelper Connection onNext"); 
//      onConnected(); 
        } 
       }); 
    } 

我有鏈接可觀察的一些問題。想象這種場景,其中我有一個連接Observable,有時候我使用它,但我主要使用的是可觀察的connectWithRetry()

我的問題是,如果添加了此會發生什麼:

.observeOn(Schedulers.newThread()) 
.subscribeOn(AndroidSchedulers.mainThread()) 

connect()connectWithRetry()兩者兼而有之?在這種情況下,當我撥打 public void connect並指定一個調度程序時,以前的那些會被忽略?

而我爲什麼得到NetworkOnMainThreadException?明確的observeOn(Schedulers.newThread())在那裏,它不應該給我那個錯誤

回答

1

我會先解決你的NetworkOnMainThread問題。

observeOn(Schedulers.newThread())意味着輸出將在一個新的線程觀察 - 也就是說,在用戶(onComplete/Error/Next)的代碼將在該線程中運行。

subscribeOn(AndroidSchedulers.mainThread()意味着訂閱會發生在主線程 - 在代碼中您所創建的可觀測(connection.connect()等)是訂閱,會發生什麼運行。

所以,簡單地交換調度:

.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 

因此,解決第一個問題,他們沒有理會,他們只是被錯誤地使用。希望從這裏你可以看到如果你將類似的調用移動到你的方法中的鏈中,返回可觀察到的結果會發生什麼:沒有什麼不同,你已經做了什麼。電話會在不同的地方。

那麼在哪裏把調度選擇?這取決於你。您可以通過具有創建觀測方法裏面subscribeOn電話獲得更高的清晰度:

connectionHelper.connectWithRetry(connection) 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 

不過,如果你覺得你調用這個處處爲沒有理由,可以改爲移動subscribeOn打電話給你的方法裏面:

return connect(connection) 
      .retryWhen(...) 
      .flatMap(...) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()); 

注意,這些不必是這樣捆綁在一起了 - 你可以subscribeOn你的方法裏面,但離開observeOn高達希望他們在一個特定的蘇氨酸結果的任何呼叫者元首。

+0

好吧,所以考慮到我爲每個observable指定了兩個調度器,而第三個調度器是鏈接的,這是錯誤的嗎?或者我應該只是簡單地指定調度器只在公共無效連接?虐待接受你的答案,謝謝你的幫助 –

+0

我已經用一些例子更新了答案 - 這真的取決於你放在哪裏!不過,我不確定你對第三個人的意思。 –

+0

好的感謝額外的信息,我欣賞它:P我的意見,名字很差的選擇。我的意思是,Observable,觀察員,Subscriber實施Observer。它就像subscribeOn是主要的問題,因爲考慮當你訂閱一個observable時,你指定了你想要對結果做什麼,subscribeOn誤導我們「在你想用結果做什麼線程中」。 Imho –

0

請嘗試Schedulers.io()可能會解決問題。