1
我正在開發一個簡單的REST應用程序,它利用RxJava將請求發送到遠程服務器(1)。對於每個到REST API的傳入請求,發送一個請求(使用RxJava和RxNetty)到(1)。一切工作正常,但現在我有一個新的用例:限制使用RxJava由多個線程創建的多個觀察值
爲了不轟炸(1)太多的請求,我需要實施速率限制。解決這個問題的一種方法(我假設)是將向(1)發送請求時創建的每個Observable
添加到實際限速的另一個Observable
(2)中。 (2)將或多或少地像隊列一樣工作,並儘可能快地處理出站請求(但不會比速率限制更快)。這裏有一些僞代碼:
Observable<MyResponse> r1 = createRequestToExternalServer() // In thread 1
Observable<MyResponse> r2 = createRequestToExternalServer() // In thread 2
// Somehow send r1 and r2 to the "rate limiter" observable, (2)
rateLimiterObservable.sample(1/rate, TimeUnit.MILLISECONDS)
我該如何使用Rx/RxJava解決這個問題?
是否有可能在請求觀測建立背壓,而不是拋出的?我需要爲此編寫自定義運算符嗎? – tobiasH