2016-11-02 40 views
0

我有時必須在Observable中執行一些清理任務(例如關閉打開的文件),我想知道什麼是最好的方法。 到目前爲止,我已經看到了兩個,但我很難理解它們有什麼不同:你能解釋一下這些差異嗎?是否有更好的方法來實現這一點?RxJava Observable.doOnUnsubscribe()vs Subscriber.add()

1)

// MyObject will take care of calling onNext(), onError() and onCompleted() 
    // on the subscriber. 
    final MyObject o = new MyObject(); 

    Observable obs = Observable.create(new Observable.OnSubscribe<Object>() { 
     @Override 
     public void call(Subscriber<? super Object> subscriber) { 
      try { 
       if (!subscriber.isUnsubscribed()) { 

        o.setSubscriber(subscriber); 

        // This will tell MyObject to start allocating resources and do its job. 
        o.start(); 

       } 
      } catch (Exception e) { 
       subscriber.onError(e); 
      } 
     } 
    }).doOnUnsubscribe(new Action0() { 
     @Override 
     public void call() { 
      // This will tell MyObject to finish its job and deallocate any resources. 
      o.stop(); 
     } 
    }); 

2)

Observable obs = Observable.create(new Observable.OnSubscribe<Object>() { 
     @Override 
     public void call(Subscriber<? super Object> subscriber) { 
      try { 
       if (!subscriber.isUnsubscribed()) { 

        // MyObject will take care of calling onNext(), onError() and onCompleted() 
        // on the subscriber. 
        final MyObject o = new MyObject(subscriber); 

        subscriber.add(Subscriptions.create(new Action0() { 
         @Override 
         public void call() { 
          // This will tell MyObject to finish its job and deallocate any resources. 
          o.stop(); 
         } 
        })); 

        // This will tell MyObject to start allocating resources and do its job. 
        o.start(); 

       } 
      } catch (Exception e) { 
       subscriber.onError(e); 
      } 
     } 
    }); 
+1

您已經使用'doOnSubscribe資源'在示例1中。它應該是問題標題中提到的'doOnUnsubscribe'。 –

+0

@PraveerGupta謝謝我修復了錯字! –

回答

1

要回答你的原問題,doOnUnSubscribe和加到Subscriber是相同的。實際上,當您致電doOnUnSubscribe時,它只會將您的Action作爲Subscription添加到您的Subscriber。所以,doOnUnSubscribe在後臺使用你的第二個例子。

doOnUnSubscribe代碼:

public class OperatorDoOnUnsubscribe<T> implements Operator<T, T> { 
    private final Action0 unsubscribe; 

/** 
* Constructs an instance of the operator with the callback that gets invoked when the modified Observable is unsubscribed 
* @param unsubscribe The action that gets invoked when the modified {@link rx.Observable} is unsubscribed 
*/ 
public OperatorDoOnUnsubscribe(Action0 unsubscribe) { 
    this.unsubscribe = unsubscribe; 
} 

@Override 
public Subscriber<? super T> call(final Subscriber<? super T> child) { 
    child.add(Subscriptions.create(unsubscribe)); 

    // Pass through since this operator is for notification only, there is 
    // no change to the stream whatsoever. 
    return Subscribers.wrap(child); 
    } 
} 
0

什麼是使用Observable.create如果你不把與subscriber.onNext下游的任何值的點?

第一個問題是一個巨大的禁忌,因爲您正在對已關閉的對象執行副作用。如果您同時從兩個不同的線程訂閱創建的observable,會發生什麼?

第二個看起來更好,因爲你添加了subscriber.add,如果訂閱已經處理,它將調用o.stop()。唯一缺少的是onNext的值,即值向下遊傳遞。

有一個用於從資源創建Observables的操作符,稱爲「using」。請看看http://reactivex.io/RxJava/javadoc/rx/Observable.html#using(rx.functions.Func0,%20rx.functions.Func1,%20rx.functions.Action1)

+0

關於你的第一句話:MyObject將在使用setSubscriber()啓動start()後立即調用onNext/onCompleted/onError。 關於你的第二句話:我不明白,你能詳細闡述一下嗎? –

+0

您正在使用哪個版本的RxJava?我想重構你的例子。 MyObject的類型是什麼? –

+0

使用rxJava 1.2.0 –

0

首先是不使用Observable.create(OnSubscribe)如果你能幫助它,因爲你可以輕鬆突破的東西(如背壓或可觀察的合同有關的事情)。您應該使用許多靜態工廠方法之一。

除了直接解決你的問題,我建議Observable.using這是明確設計釋放終止或取消訂閱資源。

例如:

Observable<byte[]> bytes = 
    Observable.using(
    () -> new FileInputStream(file), //resource factory 
    is -> Bytes.from(is), // observable factory 
    is -> is.close() // close action 
);  

上面的例子錯過一些嘗試副漁獲物(例如大約is.close())如果你使用RxJava 2.

試圖以充實你的情況將不會出現:

Observable.using(
() -> new MyObject(), //resource factory 
    myObject -> makeObservable(myObject), // observable factory 
    myObject -> myObject.stop() // close action 
);  
+0

我試過你的方法,雖然它不適用於我,因爲調用onError/onNext/onCompleted的邏輯是在MyObject(在你的情況下由FileInputStream表示)的內部。 我想這次我無法逃避'Observable.create(OnSubscribe)'。 –

+0

您的可觀察創作可以獨立於關閉方面。我在答案中添加了更多細節。最終,您應該探討在另一個StackOverflow問題中可能不使用'Observable.create'。 –

1

決定這兩個解決方案,前提是你已經提到,使用取決於ü請確認您嘗試使用/關閉/處置的資源是否意圖在多個訂閱之間共享。

  1. 使用subscriber.add(...)當資源用於生成事件。在這種情況下,你不會想分享資源。

    • 這就是你的例子MyObject的情況。這有利於資源不會暴露在Observable.create()方法之外,從而使資源免受意外的副作用。
  2. 使用doOnUnsubscribe當你要共享多個訂閱東西。

    • 舉個例子,如果你想有多少次使用的可觀察到一個櫃檯,你可以有一個共同的計數器,並繼續在doOnUnsubscribedoOnSubscribe遞增。
    • 另一個例子是,如果您想要計算當前有多少個連接打開資源,則可以使用doOnSubscribedoOnUnsubscribe中的遞增和遞減組合來實現該功能。
在你的榜樣

此外,而不是創建MyObject抽象,它是管理開放和資源,產生事件的最後,你可以用Observable.using()方法來達到同樣的更換。這需要在三個參數:

  • resourceFactory,這將打開資源,
  • observableFactory,這將產生事件和
  • disposeAction,這將關閉