2015-09-17 30 views
1

我正在開發一個簡單的REST應用程序,它利用RxJava將請求發送到遠程服務器(1)。對於每個到REST API的傳入請求,發送一個請求(使用RxJava和RxNetty)到(1)。一切工作正常,但現在我有一個新的用例:限制使用RxJava由多個線程創建的多個觀察值

爲了不轟炸(1)太多的請求,我需要實施速率限制。解決這個問題的一種方法(我假設)是將向(1)發送請求時創建的每個Observable添加到實際限速的另一個Observable(2)中。 (2)將或多或少地像隊列一樣工作,並儘可能快地處理出站請求(但不會比速率限制更快)。這裏有一些僞代碼:

Observable<MyResponse> r1 = createRequestToExternalServer() // In thread 1 
Observable<MyResponse> r2 = createRequestToExternalServer() // In thread 2 

// Somehow send r1 and r2 to the "rate limiter" observable, (2) 

rateLimiterObservable.sample(1/rate, TimeUnit.MILLISECONDS) 

我該如何使用Rx/RxJava解決這個問題?

回答

0

我會使用一個熱計時器隨着原子計數器跟蹤在給定期限剩餘的連接:

int rate = 5; 
long interval = 1000; 

AtomicInteger remaining = new AtomicInteger(rate); 

ConnectableObservable<Long> timer = Observable 
     .interval(interval, TimeUnit.MILLISECONDS) 
     .doOnNext(e -> remaining.set(rate)) 
     .publish(); 

timer.connect(); 

Observable<Integer> networkCall = Observable.just(1).delay(150, TimeUnit.MILLISECONDS); 

Observable<Integer> limitedNetworkCall = Observable 
     .defer(() -> { 
      if (remaining.getAndDecrement() != 0) { 
       return networkCall; 
      } 
      return Observable.error(new RuntimeException("Rate exceeded")); 
     }); 

Observable.interval(100, TimeUnit.MILLISECONDS) 
.flatMap(t -> limitedNetworkCall.onErrorReturn(e -> -1)) 
.take(20) 
.toBlocking() 
.forEach(System.out::println); 
+0

是否有可能在請求觀測建立背壓,而不是拋出的?我需要爲此編寫自定義運算符嗎? – tobiasH