2017-04-15 84 views
0

我試圖從「斷開連接」部分here複製示例代碼。RxJava - ConnectableObservable,斷開連接並重新連接

斷開

正如我們在連接的簽名看到,這個方法返回一個訂閱,就像Observable.subscribe一樣。您可以使用該引用來終止ConnectableObservable的訂閱。這將阻止事件傳播給觀察者,但它不會從ConnectableObservable中取消訂閱。 如果再次調用connect,ConnectableObservable將啓動一個新的訂閱,舊觀察者將再次開始接收值。

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); 
Subscription s = connectable.connect(); 

connectable.subscribe(i -> System.out.println(i)); 

Thread.sleep(1000); 
System.out.println("Closing connection"); 
s.unsubscribe(); 

Thread.sleep(1000); 
System.out.println("Reconnecting"); 
s = connectable.connect(); 

輸出

0 
1 
2 
3 
4 
Closing connection 
Reconnecting 
0 
1 
2 
... 

使用RxJava 2.0.8,我有:

ConnectableObservable<Long> connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); 
    Disposable s = connectable.connect(); 

    connectable.subscribe(new Observer<Long>() { 
     @Override 
     public void onSubscribe(Disposable d) { 

     } 

     @Override 
     public void onNext(Long aLong) { 
      Log.d("test", "Num: " + aLong); 
     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onComplete() { 

     } 
    }); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Log.d("test", "Closing connection"); 
    s.dispose(); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    Log.d("test", "Reconnecting..."); 
    connectable.connect(); 

輸出

Num: 0 
Num: 1 
Num: 2 
Num: 3 
Num: 4 
Closing connection 
Reconnecting... 

在此先感謝....

+0

我不會理解您的問題 – Cochi

+0

@Cochi在我的代碼中,我的訂閱服務器在可連接源斷開連接後再次連接後沒有收到值。 – veritas1

回答

4

看來這種行爲還沒有被RxJava採用。工作示例來自Rx.NET。請參見https://github.com/ReactiveX/RxJava/issues/4771

+0

可能是的。我能夠使其工作的唯一時間是當我再次調用'connect()'之前'再次訂閱()'時。這是一旦'dispose()'被調用後應該如何工作? –

相關問題