0
並行請求有序流我正在尋找一種方式來使用流量來實現以下目標:在助焊劑
- 處理100萬(或更多)的請求流。
- 請求編號(從1到1,000,000)。
- 應該使用10個線程並行啓動請求。
- 啓動請求的順序取決於它們的序列號。
- 結果助焊劑應按照與請求相同的順序返回響應。
- 結果助焊劑應發出每個響應AS ASON AS它的所有前輩都可用。
我知道#4的答案是對調度程序使用單個執行程序。但是,我不知道如何實現#6。
下面是一個示例場景:1,2,3,4,5,6,7,8,9,10都推出
- 請求。
- 響應2到達(請求11啓動,因爲有線程可用)。
- 響應5到達(請求12啓動)。
- 響應4到達(請求13啓動)。
- 響應1到達(請求14啓動)。
-
響應1和2被髮射並開始處理它們。
- 響應3到達(請求15啓動)。
-
響應3,4,5被髮射並開始處理。
所以 - 我應該如何修改下面的代碼,以達到#6?
public class Example {
private final Scheduler scheduler = Schedulers.fromExecutor(
\t Executors.newFixedThreadPool(10));
public void start() {
\t Flux<Request> requestFlux = getFluxOfOneMillionRequests(); // Never mind how this is achieved
\t
\t Flux<Response> responseFlux = flux.flatMap(request -> doInWorkerThread(request));
\t
\t flux.doOnNext(response -> processResponse(response)).subscribe()
}
private Mono<Response> doInWorkerThread(Request request) {
\t return Mono.fromCallable(() -> {
\t \t // Do something
\t \t return new Response(request.getSerial(), someResult);
\t }).subscribeOn(scheduler);
}
private void processResponse(Response response) {
\t // Do something
}
}