2015-09-15 74 views
1

我擴展AbstractOnSubscribe創建我自己的OnSubscribeObservable.create(OnSubscribe<T>)一起使用,我命名爲OnSubscribeInputStreamToLines,基本上讀取InputStream併爲每行調用onNext如何將AbstractOnSubscribe轉換爲與RxJava背壓支持的運營商?

事情是,我也需要這樣做,InputStream是其他Observable的一部分。

最簡單的解決辦法是做到以下幾點:

Observable<InputStream> isObservable = ...; 

isObservable 
    .flatMap(is -> Observable.create(new OnSubscribeInputStreamToLines(is))); 

的事情是,不會是真的有效,因爲它會造成可觀察到的每個的inputStream。我想我可以用Observable.lift來做到這一點。

有沒有辦法讓我可以輕鬆地將我的OnSubscribeInputStreamToLines轉換爲Operator

我主要擔心的問題背壓,我會叫onNextInputStream的每一行,雖然AbstractOnSubscribe支持背壓,我找不到一個AbstractOperator做同樣的。

感謝

回答

1

這裏的區別是,你OnSubscribeInputStreamToLines是一個入口點可觀察世界,而lift是一箇中間運營商現有的序列。此外,整個吞吐量可能被IO操作背後InputStream爲主或在操作字符串處理,所以我不會擔心瘦包裝。

AbstractOnSubscribe捕獲操作符的生成器方面,它可以幫助您構建背壓感知值發射器(一般爲冷源),您可以在其中繪製出發射值,發射時間和發射值。另一方面,由於Operator可以更自由地與上游值和下游請求進行交互,所以不能一概而論。它們針對特定任務進行了高度定製,而且它們幾乎沒有共同點。它們可以從一組基元構建而成,但就是這樣(我已經寫了數百個)。

所以不要害怕flatMap平過的事情。

1

不要打擾有關創建另一個觀察到的,每個InputStream。開銷可能是不一樣大,你可能會認爲特別是相對於與lift相關的開銷。

我不知道您正在使用的InputStream的性質,但您應該考慮Observable.using()以安全地關閉這些資源(終止或取消訂閱)。

你絕對正確地對寫背壓支持Operator猶豫不決。這是非常棘手的地上踩着,除非你正在撰寫現有Operator秒。

+0

我看到akarnokd也回答了。你不會找到比他更有知識的來源! –