0
在RxJava 1/RxScala中,如何在下列情況下節流/背壓可觀測源?基於資源稀缺可觀測到的背壓
def fast: Observable[Foo] // Supports backpressure
def afterExpensiveOp: Observable[Bar] =
fast.flatMap(foo => Observable.from(expensiveOp(foo))
// Signature and behavior is out of my control
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = {
if(noResources()) Future.failed(new OutOfResourcesException())
else Future { Bar() }
}
一個可能的解決方案是阻止直到。其工作,但,這是非常不雅,防止多個同時請求:
def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo =>
Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head)
)
可能你必須爲此編寫自己的操作符。在RxScala中,一個運算符是一個函數'Subscriber [T] => Subscriber [R]',您可以使用'lift'將它應用到'Observable'。在某些時候,您創建的Subscriber [R]將不得不檢查是否有可用的資源,如果是,請調用其繼承的'request'方法以從'fast'可觀察值中獲取更多項。 –