2015-06-29 63 views
1

我需要在RxJava中創建一個ConnectableSubject的等效物。我想在任何訂閱它之前將事件發佈到主題,並且我希望訂閱者獲得所有事件。我不想使用ReplaySubject來緩存這些事件,因爲我不想讓它們在被使用後留在內存中。在RxJava中創建一個ConnectableSubject的等價物

理想情況下,此代碼可行,但第一個事件丟失。

PublishSubject<String> subject = PublishSubject.create(); 
ConnectableObservable<String> observable = subject.publish(); 

subject.onNext("1"); 
observable.subscribe(new Action1<String>() { 
    @Override 
    public void call(String s) { 
     System.out.println("string is " + s); 
    } 
}); 
observable.connect(); 
subject.onNext("2"); 
subject.onNext("3"); 

此代碼的輸出是

string is 2 
string is 3 

在理想情況下它應該是

string is 1 
string is 2 
string is 3 
+0

[ReplaySubject](http://reactivex.io/RxJava/javadoc/rx/subjects/ReplaySubject.html)與您正在尋找的內容非常接近,但似乎無限期地緩存了所有未來訂閱者的輸入內容(而不是比_first_訂戶)。儘管有一些選項 - ['createWithSize(int size)'](http://reactivex.io/RxJava/javadoc/rx/subjects/ReplaySubject.html#createWithSize(int)),您可以指定最大緩存大小。 –

回答

1

調用上的PublishSubjectpublish()在功能上無操作。最需要的是內部的BufferUntilSubscriber,它會緩存事件,直到單個Subscriber訂閱它爲止。但是,它不支持背壓或多於一個Subscriber,因此您可能仍需要使用publish()onBackpressureBuffer()