2015-09-12 118 views
0

假設以下情況,對問題的最佳解決方案是什麼?RxJava從另一位觀察員失敗

我有兩個流,一個代表TCP連接,另一個代表TCP連接的狀態。 一旦狀態改變(即斷開連接),我想重新獲得TCP連接。

我最初的想法是讓這兩個流合併它們並將retryWith應用於生成的Observable。第二個流是PublishSubject的一個實例,它給了我一個非常方便的失敗方法。現在,除了在發佈服務器上調用onError()時,連接流(#1)保持訂閱/取消訂閱,直到如果超出retryWhen設置的限制時,該想法部分起作用。

我確定這個問題在過去一定已經解決了,因此你想保持一個TCP連接並運行,我不知道如何從這裏進步。任何幫助,將不勝感激。

+0

通常TCP連接不會僅僅關閉自身,除非高層協議需要它。 TCP已經有了一些錯誤糾正機制,而它本身無法解決的任何問題通常都是一個真正的網絡問題。你確定,你不想在這裏依賴成熟的低層機制嗎? –

+0

@SebastianS如果出現網絡中斷,則TCP將斷開連接。就我所知,在這個特定的實現中,在網絡再次可用後,它不會重新連接。 – travikk

回答

0

你不需要兩個流。只用一個。典型的構造將包括Observable.using()來創建套接字,在該套接字上建立可觀察對象並處理關閉套接字,然後通過retry()(通常延遲)將其鏈接起來。

+0

Observable如何知道何時重試?你能提供一個例子嗎? – travikk

0
return Observable.using(new Func0<XMPPTCPConnection>() { 
     @Override 
     public XMPPTCPConnection call() { 
      L.log("creating connection"); 
      connection = _createConnection(); 
      return connection; 
     } 
    }, new Func1<XMPPTCPConnection, Observable<?>>() { 
     @Override 
     public Observable<?> call(final XMPPTCPConnection connection) { 

      try { 
       _authenticate(connection, username, password); 
      } catch (Exception e) { 
       throw new RuntimeException(e); 
      } 

      return Observable.just(connection) 
        .repeatWhen(new RepeatWhenOperator(-1, 1000)) 
        .map(new Func1<XMPPTCPConnection, Object>() { 
         @Override 
         public Object call(XMPPTCPConnection connection) { 
          if (!connection.isConnected()) { 
           throw new RuntimeException("Disconnected"); 
          } 

          return null; 
         } 
        }); 
     } 
    }, new Action1<XMPPTCPConnection>() { 
     @Override 
     public void call(XMPPTCPConnection connection) { 
      L.log("disposing"); 
      _disconnect(); 
     } 
    }) 
      .subscribeOn(Schedulers.io()) 
      .unsubscribeOn(Schedulers.io()) 
      .retry(); 

好的,所以基於Observable.using()我想出了這個解決方案。它可以工作,但我對解決方案不滿意 - 即每秒查詢連接是否仍然存在。 XMPPTCPConnection爲我提供了一個關於連接何時死亡的監聽器,這似乎是一個更好的解決方案 - 只是不知道如何合併...

_disconnect()方法實際上意味着斷開連接並處理;在當前情況下它可能是一個可憐的名字,但導致拋出RuntimeException的斷開發生在這個系統之外。

任何想法或改進將大大appriciated!

+0

_disconnect應該以某種方式引用連接。我假設你已經故意留下了一些代碼。就RepeatWhen部分而言,我並不期望在那裏看到。我無法對XMPPTCPConnection發表評論,因爲我不熟悉它,但除了使用正常的TCP連接以及延遲重試之外,我還使用'.timeout',以便只有在連接安靜的情況下,我們才能在一段時間後重新連接。要做到這一點,只需在最後一個'.retry()'之前放一個'.timeout'調用。 –

+0

@DaveMoten啊我看到了,我假設你定期收到一個ping,然後你發出一個項目;如果ping不通過,則意味着連接斷開 - 我的理解是否正確?謝謝 – travikk

+0

是的,可以執行ping並在.timeout()之後將其過濾出來,或者如果連接足夠忙,則只需依靠正常流量 –