我擴展AbstractOnSubscribe
創建我自己的OnSubscribe
與Observable.create(OnSubscribe<T>)
一起使用,我命名爲OnSubscribeInputStreamToLines
,基本上讀取InputStream
併爲每行調用onNext
。如何將AbstractOnSubscribe轉換爲與RxJava背壓支持的運營商?
事情是,我也需要這樣做,InputStream
是其他Observable
的一部分。
最簡單的解決辦法是做到以下幾點:
Observable<InputStream> isObservable = ...;
isObservable
.flatMap(is -> Observable.create(new OnSubscribeInputStreamToLines(is)));
的事情是,不會是真的有效,因爲它會造成可觀察到的每個的inputStream。我想我可以用Observable.lift
來做到這一點。
有沒有辦法讓我可以輕鬆地將我的OnSubscribeInputStreamToLines
轉換爲Operator
?
我主要擔心的問題背壓,我會叫onNext
爲InputStream
的每一行,雖然AbstractOnSubscribe
支持背壓,我找不到一個AbstractOperator
做同樣的。
感謝
我看到akarnokd也回答了。你不會找到比他更有知識的來源! –