在RxJava你可以從常規的Java執行者創建自己的調度程序:
ExecutorService exec= Executors.newFixedThreadPool(2); //2 Fixed threads
Schedulers.from(exec);
所以只是創建爲每個資源的有限數量的線程的執行,並使用特定的調度每當訪問資源。有限數量的線程將限制併發呼叫的數量。
編輯:
顯然我誤解了這個問題。如果呼叫是異步的,您可以嘗試使用Rx的背壓來管理它們。下面是有關如何使用的Rx管理此類電話的想法:
您創建一個「資源許可證可觀察的」發射的東西(某種令牌)每當API 可以被調用。其令牌(許可證)創建速率將是該API的最大使用率。每當有些可觀察的需要調用API時,只需zip帶有可觀察許可的調用。該郵編操作將阻塞,直到許可證可用,限制API調用許可證代
的速度這裏有一個簡單的實現而觀察到的許可證發出時間戳:
public class PermitObservable extends Observable<Long> {
private final long msBetweenEmissions;
public PermitObservable(long msBetweenEmissions) {
super(new SyncOnSubscribe<Long, Long>() {
@Override
protected Long generateState() {
return System.currentTimeMillis();
}
@Override
protected Long next(Long state, Observer<? super Long> observer) {
long nextEmissionAt = state + msBetweenEmissions;
long timeToWait = nextEmissionAt - System.currentTimeMillis();
if (timeToWait > 0) {
try {
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
observer.onError(e);
}
}
long now = System.currentTimeMillis();
observer.onNext(Long.valueOf(now)); // Permit emission
return now;
}
});
this.msBetweenEmissions = msBetweenEmissions;
}
}
我想到了這一點。但是,遠程請求是異步的,如果我正確理解調度程序,那麼我只會限制異步服務的併發調用數量,而不是掛起的請求數量,對嗎?所以我需要阻止這些請求,這樣在我得到來自遠程API的響應之前線程不會被釋放。 – tobiasH
這取決於Java執行程序。您可以使用一個執行程序來阻止超過一定數量的待處理請求(請參閱:http://stackoverflow.com/a/4522411/3199595) – Malt
我不確定我們是否在談論相同的事情。通過等待,我的意思是等待遠程服務。我的API請求是非阻塞的,所以我認爲在您的解決方案中,我只會將兩個請求以並行方式提交給遠程服務,但由於在接收到響應之前線程不會被阻塞,所以仍然會使(遠程)隊列飽和, 對?我不確定隊列實現如何幫助解決這個問題。 – tobiasH