2016-02-06 52 views
0

我正在使用RxJava異步處理servlet請求。在每個請求期間,使用flatMap運算符對遠程服務API進行一系列異步調用。使用Rx限制遠程資源的使用情況

由於資源限制,我需要限制針對該API的併發請求總數。使用其併發性參數在單個flatMap中調用API的單個Rx流將是微不足道的。但我的應用程序中有多個獨立的流(每個ServletRequest基本上都有一個流),每個流都有多個到API的flatMap調用。

所以我想我將不得不將所有的遠程請求彙集到執行實際調用的單例流中,然後可以輕鬆地限制併發性。但是,將API響應恢復到原始流中似乎並不簡單。另外,在這樣的結構中保持背壓似乎很複雜。

另一個選擇是使用傳統的信號量,但我不確定它的阻塞行爲是否適合Rx。

那麼是否有一個既定的模式來實現這樣的事情?還是我錯過了一個巧妙的運算符組合,完全避免了這些複雜問題?

回答

0

在RxJava你可以從常規的Java執行者創建自己的調度程序:

ExecutorService exec= Executors.newFixedThreadPool(2); //2 Fixed threads 
Schedulers.from(exec); 

所以只是創建爲每個資源的有限數量的線程的執行,並使用特定的調度每當訪問資源。有限數量的線程將限制併發呼叫的數量。

編輯:

顯然我誤解了這個問題。如果呼叫是異步的,您可以嘗試使用Rx的背壓來管理它們。下面是有關如何使用的Rx管理此類電話的想法:

您創建一個「資源許可證可觀察的」發射的東西(某種令牌)每當API 可以被調用。其令牌(許可證)創建速率將是該API的最大使用率。每當有些可觀察的需要調用API時,只需zip帶有可觀察許可的調用。該郵編操作將阻塞,直到許可證可用,限制API調用許可證代

的速度這裏有一個簡單的實現而觀察到的許可證發出時間戳:

public class PermitObservable extends Observable<Long> { 

    private final long msBetweenEmissions; 

    public PermitObservable(long msBetweenEmissions) { 
     super(new SyncOnSubscribe<Long, Long>() { 
      @Override 
      protected Long generateState() { 
       return System.currentTimeMillis(); 
      } 

      @Override 
      protected Long next(Long state, Observer<? super Long> observer) { 
       long nextEmissionAt = state + msBetweenEmissions; 
       long timeToWait = nextEmissionAt - System.currentTimeMillis(); 
       if (timeToWait > 0) { 
        try { 
         Thread.sleep(timeToWait); 
        } catch (InterruptedException e) { 
         observer.onError(e); 
        } 
       } 
       long now = System.currentTimeMillis(); 
       observer.onNext(Long.valueOf(now)); // Permit emission 
       return now; 
      } 
     }); 

     this.msBetweenEmissions = msBetweenEmissions; 
    } 
} 
+0

我想到了這一點。但是,遠程請求是異步的,如果我正確理解調度程序,那麼我只會限制異步服務的併發調用數量,而不是掛起的請求數量,對嗎?所以我需要阻止這些請求,這樣在我得到來自遠程API的響應之前線程不會被釋放。 – tobiasH

+0

這取決於Java執行程序。您可以使用一個執行程序來阻止超過一定數量的待處理請求(請參閱:http://stackoverflow.com/a/4522411/3199595) – Malt

+0

我不確定我們是否在談論相同的事情。通過等待,我的意思是等待遠程服務。我的API請求是非阻塞的,所以我認爲在您的解決方案中,我只會將兩個請求以並行方式提交給遠程服務,但由於在接收到響應之前線程不會被阻塞,所以仍然會使(遠程)隊列飽和, 對?我不確定隊列實現如何幫助解決這個問題。 – tobiasH

相關問題