2015-05-18 20 views
0

我想讀取Observable中的我的InputStream併發出解析的數據(比如說DataPacket)。我也希望有不同的subscribers來處理不同類型的DataPacket(每個subscriber將應用它自己的過濾器到最初的observable)。這意味着,Observable應該在不同的subscribers之間共享狀態。我決定使用share(),但遇到MissingBackpressureExceptionStringObservable.from(InputStream).share()導致立即MissingBackPressure

下面的代碼失敗:

readSubscription = StringObservable.from(mInStream,1024) 
     .share() 
     .subscribeOn(Schedulers.io()) 
     .observeOn(Schedulers.newThread()) 
     .subscribe(new Action1<byte[]>() { 
      @Override 
      public void call(byte[] bytes) { 

      } 
     }); 

我什麼都不做,我subscribe方法 - subscriber應該足夠快。

一切都很好,如果我刪除share()。此代碼的工作:

readSubscription = StringObservable.from(mInStream,1024) 
     .subscribeOn(Schedulers.io()) 
     .observeOn(Schedulers.newThread()) 
     .subscribe(new Action1<byte[]>() { 
      @Override 
      public void call(byte[] bytes) { 

      } 
     }); 

我明白,share()可以是一個昂貴的操作,我的InputStream(每秒〜100個消息)產生大量的消息。

我的問題:如何實現一個Observable其內容InputStream並分享不同Subscribers之間的狀態。

回答

1

當前v0.22不能正確支持背壓,因此您應該使用onBackpressureBuffer來避免MissingBackpressureException。我會看看我們是否可以發佈應該工作的最新代碼。

此外,使用share()可能會令人驚訝,因爲它確實引用了對用戶的計數。您無法一次性訂閱所有訂閱者,有些訂閱者可能無法從一開始就收到所有的值。相反,您可以使用publish()運算符,並在所有訂閱者訂閱後,在返回的observable上調用connect()

您也可以使用cache(),它會將來源重播給任何延遲的用戶,但它不支持背壓,您也需要使用onBackpressureBuffer