我目前正努力嘗試使用rx實現tcp看門狗/重試系統,您的幫助將不勝感激。rx-java中的套接字看門狗
有一個Observable,我想有一個Observable定期檢查我們是否仍然可以寫入套接字。很容易,我可以做這樣的事情:
class SocketSubscribeFunc implements Observable.OnSubscribeFunc<Socket> {
private final String hostname;
private final int port;
private Socket socket;
SocketSubscribeFunc(String hostname, int port) {
this.hostname = hostname;
this.port = port;
}
public Subscription onSubscribe(final Observer<? super Socket> observer) {
try {
log.debug("Trying to connect...");
socket = new Socket(hostname, port);
observer.onNext(socket);
} catch (IOException e) {
observer.onError(e);
}
return new Subscription() {
public void unsubscribe() {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
};
}
}
Observable<Socket> socketObservable = Observable.create(new SocketSubscribeFunc(hostname,port));
Observable<Boolean> watchdog = Observable.combineLatest(socketObservable, Observable.interval(1, TimeUnit.SECONDS), new Func2<Socket, Long, Boolean>() {
public Boolean call(final Socket socket, final Long aLong) {
try {
socket.getOutputStream().write("ping\n".getBytes());
return true;
} catch (IOException e) {
return false;
}
}
});
現在,我要重新連接,如果插座可以獲取(服務器/鏈路斷開時產生),或變得不可寫(服務器/鏈接成功後可達連接)。 理想情況下,通過重新訂閱其OnSubscribeFunc使用重試運算符創建連接的套接字Observable。如您所見,這將在套接字和監視器Observables之間引入一個循環依賴關係。 我玩了一會兒,用switchMap/materialize ...爲了傳播最終的錯誤無濟於事。
我接近了放棄這個想法,並使用副作用代碼副作用。但在全球的某個角落應該有更好的辦法:)
在此先感謝!