2017-02-26 96 views
1

我的用例與將RxJava2與Firebase數據庫一起使用有關。RxJava2從監聽器創建Flowable並在最後刪除監聽器

我有DatabaseReference,我可以爲它註冊值監聽器。 我可以轉換成這樣的可流動:

disposable = Flowable.create<DataSnapshot>({ s -> 
      dbRef.addValueEventListener(object : ValueEventListener { 
       override fun onCancelled(p0: DatabaseError) {...} 

       override fun onDataChange(value: DataSnapshot) { 
        s.onNext(value) 
       } 
      }) 
     }, BackpressureStrategy.BUFFER) 
     .subscribe(...) 

我想能夠一次性設置時以除去偵聽器。 任何想法我可以做到這一點?

我看到,在rxjava 1有this possibility也許,但它不是在rxjava2

回答

2

隨着RxJava2提供您需要使用setCancellable()方法,並把你的聽衆移除代碼在那裏。
這與使用Observable.fromEmitter()創建Observable時RxJava1的Emitter.setCancellation()非常相似。

也藉此說明akarnokd關於取消:
「但需要注意的是,除非創建邏輯放棄調度(通過終止或去異步),取消邏輯可能不會永遠執行,由於同池活鎖」。 (RxJava 2: always unsubscribe on the .subscribeOn(..) scheduler?