我想讀取Observable
中的我的InputStream
併發出解析的數據(比如說DataPacket
)。我也希望有不同的subscribers
來處理不同類型的DataPacket
(每個subscriber
將應用它自己的過濾器到最初的observable
)。這意味着,Observable
應該在不同的subscribers
之間共享狀態。我決定使用share()
,但遇到MissingBackpressureException
。StringObservable.from(InputStream).share()導致立即MissingBackPressure
下面的代碼失敗:
readSubscription = StringObservable.from(mInStream,1024)
.share()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
}
});
我什麼都不做,我subscribe
方法 - subscriber
應該足夠快。
一切都很好,如果我刪除share()
。此代碼的工作:
readSubscription = StringObservable.from(mInStream,1024)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
}
});
我明白,share()
可以是一個昂貴的操作,我的InputStream
(每秒〜100個消息)產生大量的消息。
我的問題:如何實現一個Observable
其內容InputStream
並分享不同Subscribers
之間的狀態。