2016-11-14 50 views
2

此代碼RXJava份額()不Observable.create工作()

public class ConnectObs { 
public static void main(String[] args) { 

    Observable<Integer> intsObservable = Observable.just(1, 2); 
    intsObservable = intsObservable.share(); 

    intsObservable.subscribe(s->System.out.println("A " + s)); 
    intsObservable.subscribe(s->System.out.println("B " + s)); 

    intsObservable = Observable.create(s -> { 
     s.onNext(1); 
     s.onNext(2); 
    }); 
    intsObservable = intsObservable.share(); 

    intsObservable.subscribe(s->System.out.println("C " + s)); 
    intsObservable.subscribe(s->System.out.println("D " + s)); 
    } 
} 

產生用於A,B和C的結果,但不是爲d - 這是爲什麼?

結果如下:

A 1 
A 2 
B 1 
B 2 
C 1 
C 2 

回答

2

Observable.just和您的自定義可觀測(其不被方式安全標準建造)之間的重要區別是,你這樣C認購仍處於活動狀態時,沒有完成的流D訂閱發生,因此D只是等待更多的未來排放量。

你的創作應該是這樣的:

Observable.<Integer> create(s -> { 
     s.onNext(1); 
     s.onNext(2); 
     s.onCompleted(); 
}) 
//prevent MissingBackpressureException 
.onBackpressureBuffer(); 

有那麼一點友好的用戶您可以添加unsubscribe檢查過:

Observable.<Integer> create(s -> { 
     s.onNext(1); 
     if (!s.isUnsubscribed()) 
      s.onNext(2); 
     if (!s.isUnsubscribed()) 
      s.onCompleted(); 
}).onBackpressureBuffer();