2016-11-09 74 views
0

在RxJava 1/RxScala中,如何在下列情況下節流/背壓可觀測源?基於資源稀缺可觀測到的背壓

def fast: Observable[Foo] // Supports backpressure 

def afterExpensiveOp: Observable[Bar] = 
    fast.flatMap(foo => Observable.from(expensiveOp(foo)) 

// Signature and behavior is out of my control 
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = { 
    if(noResources()) Future.failed(new OutOfResourcesException()) 
    else Future { Bar() } 
} 

一個可能的解決方案是阻止直到。其工作,但,這是非常不雅,防止多個同時請求:

def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo => 
    Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head) 
) 
+1

可能你必須爲此編寫自己的操作符。在RxScala中,一個運算符是一個函數'Subscriber [T] => Subscriber [R]',您可以使用'lift'將它應用到'Observable'。在某些時候,您創建的Subscriber [R]將不得不檢查是否有可用的資源,如果是,請調用其繼承的'request'方法以從'fast'可觀察值中獲取更多項。 –

回答

0

flatMap有一個參數來限制併發用戶的數量。如果你使用這個flatMap來照顧你的背壓。

def afterExpensiveOp = fast.flatMap(safeNumberOfConccurrentExpensiveOps, x => Observable.from(expensiveOp(x)))