2017-09-04 34 views
3

在調試rxJava網絡電話在一個應用程序停止rxJava觀察到鏈執行我碰到這樣一種情況,如果我們disposeclear處置對象由observables那麼只有第一observable鏈的訂閱返回得到處理而不是其他鏈接observablesflatMap上佈置

看一看下面的演示代碼片段:

CompositeDisposable testCompositeDisposal = new CompositeDisposable(); 

private void testLoadData() { 
    Disposable disposable = Observable.create(sbr -> { 
     for (int i = 0; i < 5; i++) { 
      Thread.sleep(3000); 
      Log.w("Debug: ", "First: " + i); 
      sbr.onNext(true); 
     } 
     sbr.onComplete(); 
    }).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> { 
     for (int i = 0; i < 5; i++) { 
      Thread.sleep(3000); 
      Log.w("Debug: ", "Second: " + i); 
      sbr.onNext(true); 
     } 
     sbr.onComplete(); 
    })).doOnNext(value -> { 
     Log.w("Debug: ", "doONNext"); 
    }).doOnDispose(()-> { 
     Log.w("Debug: ", "doOnDispose: observable has been disposed"); 
    }).subscribe(); 

    testCompositeDisposal.add(disposable); 
} 

@Override 
public void onStop() { 
    super.onStop(); 
    testCompositeDisposal.clear(); 
} 

輸出:

W/Debug:: First: 0 
W/Debug:: doOnDispose: observable has been disposed // I dispose Observable chain here. 
W/Debug:: First: 1 
W/Debug:: First: 2 
W/Debug:: First: 3 
W/Debug:: First: 4 

正如你可以看到上面的日誌輸出,當我處理給rxJava觀察到鏈只有第一可觀察的停止發射物品。

我想停止所有被鏈接的觀測值。

解決此問題的慣用方法是什麼?

+0

所有這些Observable.create東西看起來很evilish :-)。首先:你需要添加平面圖的可觀察物到你的複合一次性物品上。第二:不要在任何地方創建()。這不是rxjava的設計方式。使用.timeout/.timer/.delay的Observable.just(...)或Observable.range()效果很好。 –

+0

@EmanuelSeibold這只是我實現的一個演示。我在這裏使用計時器來模擬網絡通話。我已經使用doOnSubscribe來獲得一次性平面地圖可觀察。它拋出奇怪的例外。 – chandil03

回答

4

兩件事情:

  • flatMap可以預先消耗物品從上游(最多16上機器人);
  • 在調用onNext之前,您應該檢查觀察者是否處置(通過.isDisposed())並在這種情況發生時中止,其次是更適用於您的用例。

此外,flatMap被終止(實際上它永遠不會被調用)。 第一個繼續。

編輯

private void testLoadData() { 
    Disposable disposable = Observable.create(sbr -> { 
     for (int i = 0; i < 5; i++) { 
      if(sbr.isDisposed()) return; // this will cause subscription to terminate. 
      Thread.sleep(3000); 
      Log.w("Debug: ", "First: " + i); 
      sbr.onNext(true); 
     } 
     sbr.onComplete(); 
    }).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> { 
     for (int i = 0; i < 5; i++) { 
      Thread.sleep(3000); 
      Log.w("Debug: ", "Second: " + i); 
      sbr.onNext(true); 
     } 
     sbr.onComplete(); 
    })).doOnNext(value -> { 
     Log.w("Debug: ", "doONNext"); 
    }).doOnDispose(()-> { 
     Log.w("Debug: ", "doOnDispose: observable has been disposed"); 
    }).subscribe(); 

    testCompositeDisposal.add(disposable); 
} 
+0

我可以應付第二點,但這是做到這一點的唯一方法嗎?我的意思是這是慣用的方式? – chandil03

+0

還有一件事,我該如何中止?拋出錯誤可觀察? – chandil03

+1

是的,這是終止迭代的正確方法。而當用戶被處置時,你只需從lambda返回,不需要其他任何東西。請注意,你沒有解釋你的用例是什麼,因爲迄今爲止所顯示的所有內容都可以通過現有的操作員完成。 –