2014-03-07 77 views
4

我目前正努力嘗試使用rx實現tcp看門狗/重試系統,您的幫助將不勝感激。rx-java中的套接字看門狗

有一個Observable,我想有一個Observable定期檢查我們是否仍然可以寫入套接字。很容易,我可以做這樣的事情:

class SocketSubscribeFunc implements Observable.OnSubscribeFunc<Socket> { 
    private final String hostname; 
    private final int port; 
    private Socket socket; 

    SocketSubscribeFunc(String hostname, int port) { 
    this.hostname = hostname; 
    this.port = port; 
    } 

    public Subscription onSubscribe(final Observer<? super Socket> observer) { 
    try { 
     log.debug("Trying to connect..."); 
     socket = new Socket(hostname, port); 
     observer.onNext(socket); 
    } catch (IOException e) { 
     observer.onError(e); 
    } 
    return new Subscription() { 
     public void unsubscribe() { 
     try { 
      socket.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
     } 
    }; 
    } 
} 

Observable<Socket> socketObservable = Observable.create(new SocketSubscribeFunc(hostname,port)); 
Observable<Boolean> watchdog = Observable.combineLatest(socketObservable, Observable.interval(1, TimeUnit.SECONDS), new Func2<Socket, Long, Boolean>() { 

    public Boolean call(final Socket socket, final Long aLong) { 
    try { 
     socket.getOutputStream().write("ping\n".getBytes()); 
     return true; 
    } catch (IOException e) { 
    return false; 
    } 
    } 
}); 

現在,我要重新連接,如果插座可以獲取(服務器/鏈路斷開時產生),或變得不可寫(服務器/鏈接成功後可達連接)。 理想情況下,通過重新訂閱其OnSubscribeFunc使用重試運算符創建連接的套接字Observable。如您所見,這將在套接字和監視器Observables之間引入一個循環依賴關係。 我玩了一會兒,用switchMap/materialize ...爲了傳播最終的錯誤無濟於事。

我接近了放棄這個想法,並使用副作用代碼副作用。但在全球的某個角落應該有更好的辦法:)

在此先感謝!

回答

6

首先,大多數時候我會避免Observable.create,因爲它通常不需要,並且會帶來不必要的複雜性。在這種情況下,Rx有一個名爲using的運算符,它允許您創建Observable生命期中存在的資源對象。它自動捕獲運行時錯誤,並且還提供了一個處理動作,所以這對於這個用例中的套接字來說是完美的。我使用Java8 lambda表達式,因爲它們更容易僞代碼。

Observable.using(
    // Resource (socket) factory 
    () -> { 
     try { 
     return new Socket(hostname, port); 
     } catch (IOException e) { 
     // Rx will propagate this as an onError event. 
     throw new RuntimeException(e); 
     } 
    }, 
    // Observable factory 
    (socket) -> { 
     return Observable.interval(1, TimeUnit.SECONDS) 
      .map((unusedTick) { 
      try { 
       socket.getOutputStream().write("ping\n".getBytes()); 
       return true; 
      } catch (IOException e) { 
       throw new RuntimeException(e); 
      } 
      }) 
      // Retry the inner job up to 3 times before propagating. 
      .retry(3); 
    }, 
    // Dispose action for socket. 
    // In real life the close probably needs a try/catch. 
    (socket) -> socket.close()) 
    // Retry the outer job up to 3 times. 
    .retry(3) 
    // If we propagate all errors, emit a 'false', signaling service is not available. 
    .onErrorResumeNext(Observable.just(false)); 

注意,這將重試外工作,如果內一個傳播(後3次失敗)。爲了解決這個問題,你應該檢查出重試的文檔與謂詞以及retryWhen。如果這不是內部作業傳播的類型,則可以拋出一個特殊的RuntimeException並僅重試外部作業。

using文檔:http://reactivex.io/RxJava/javadoc/rx/Observable.html#using(rx.functions.Func0,%20rx.functions.Func1,%20rx.functions.Action1)