我試圖從「斷開連接」部分here複製示例代碼。RxJava - ConnectableObservable,斷開連接並重新連接
斷開
正如我們在連接的簽名看到,這個方法返回一個訂閱,就像Observable.subscribe一樣。您可以使用該引用來終止ConnectableObservable的訂閱。這將阻止事件傳播給觀察者,但它不會從ConnectableObservable中取消訂閱。 如果再次調用connect,ConnectableObservable將啓動一個新的訂閱,舊觀察者將再次開始接收值。
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Subscription s = connectable.connect();
connectable.subscribe(i -> System.out.println(i));
Thread.sleep(1000);
System.out.println("Closing connection");
s.unsubscribe();
Thread.sleep(1000);
System.out.println("Reconnecting");
s = connectable.connect();
輸出
0
1
2
3
4
Closing connection
Reconnecting
0
1
2
...
使用RxJava 2.0.8,我有:
ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish();
Disposable s = connectable.connect();
connectable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d("test", "Num: " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Closing connection");
s.dispose();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("test", "Reconnecting...");
connectable.connect();
輸出
Num: 0
Num: 1
Num: 2
Num: 3
Num: 4
Closing connection
Reconnecting...
在此先感謝....
我不會理解您的問題 – Cochi
@Cochi在我的代碼中,我的訂閱服務器在可連接源斷開連接後再次連接後沒有收到值。 – veritas1