我正在研究一個庫,它將採用一個對象DataRequest
作爲輸入參數並基於該對象構建一個URL,然後調用我們的應用程序服務器使用apache http客戶端,然後將響應返回給使用我們庫的客戶。有些客戶會撥打executeSync
方法獲得相同的功能,一些客戶會撥打我們的executeAsync
方法來獲取數據。在多線程環境中並行執行每個子任務
executeSync()
- 等待,直到我有一個結果,返回結果。executeAsync()
- 返回一個Future,如果需要,可以在其他事情完成後立即處理。
下面是我DataClient
類具有以上兩種方法:
public class DataClient implements Client {
private final ForkJoinPool forkJoinPool = new ForkJoinPool(16);
private CloseableHttpClient httpClientBuilder;
// initializing httpclient only once
public DataClient() {
try {
RequestConfig requestConfig =
RequestConfig.custom().setConnectionRequestTimeout(500).setConnectTimeout(500)
.setSocketTimeout(500).setStaleConnectionCheckEnabled(false).build();
SocketConfig socketConfig =
SocketConfig.custom().setSoKeepAlive(true).setTcpNoDelay(true).build();
PoolingHttpClientConnectionManager poolingHttpClientConnectionManager =
new PoolingHttpClientConnectionManager();
poolingHttpClientConnectionManager.setMaxTotal(300);
poolingHttpClientConnectionManager.setDefaultMaxPerRoute(200);
httpClientBuilder =
HttpClientBuilder.create().setConnectionManager(poolingHttpClientConnectionManager)
.setDefaultRequestConfig(requestConfig).setDefaultSocketConfig(socketConfig).build();
} catch (Exception ex) {
// log error
}
}
@Override
public List<DataResponse> executeSync(DataRequest key) {
List<DataResponse> responsList = null;
Future<List<DataResponse>> responseFuture = null;
try {
responseFuture = executeAsync(key);
responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException | ExecutionException | InterruptedException ex) {
responsList =
Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT,
DataStatusEnum.ERROR));
responseFuture.cancel(true);
// logging exception here
}
return responsList;
}
@Override
public Future<List<DataResponse>> executeAsync(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder);
return this.forkJoinPool.submit(task);
}
}
下面是我DataFetcherTask
類也有一個靜態類DataRequestTask
它通過使URL調用我們的應用服務器:
public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {
private final DataRequest key;
private final CloseableHttpClient httpClientBuilder;
public DataFetcherTask(DataRequest key, CloseableHttpClient httpClientBuilder) {
this.key = key;
this.httpClientBuilder = httpClientBuilder;
}
@Override
protected List<DataResponse> compute() {
// Create subtasks for the key and invoke them
List<DataRequestTask> requestTasks = requestTasks(generateKeys());
invokeAll(requestTasks);
// All tasks are finished if invokeAll() returns.
List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
for (DataRequestTask task : requestTasks) {
try {
responseList.add(task.get());
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
return Collections.emptyList();
}
}
return responseList;
}
private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
List<DataRequestTask> tasks = new ArrayList<>(keys.size());
for (DataRequest key : keys) {
tasks.add(new DataRequestTask(key));
}
return tasks;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
/** Inner class for the subtasks. */
private static class DataRequestTask extends RecursiveTask<DataResponse> {
private final DataRequest request;
public DataRequestTask(DataRequest request) {
this.request = request;
}
@Override
protected DataResponse compute() {
return performDataRequest(this.request);
}
private DataResponse performDataRequest(DataRequest key) {
MappingHolder mappings = DataMapping.getMappings(key.getType());
List<String> hostnames = mappings.getAllHostnames(key);
for (String hostname : hostnames) {
String url = generateUrl(hostname);
HttpGet httpGet = new HttpGet(url);
httpGet.setConfig(generateRequestConfig());
httpGet.addHeader(key.getHeader());
try (CloseableHttpResponse response = httpClientBuilder.execute(httpGet)) {
HttpEntity entity = response.getEntity();
String responseBody =
TestUtils.isEmpty(entity) ? null : IOUtils.toString(entity.getContent(),
StandardCharsets.UTF_8);
return new DataResponse(responseBody, DataErrorEnum.OK, DataStatusEnum.OK);
} catch (IOException ex) {
// log error
}
}
return new DataResponse(DataErrorEnum.SERVERS_DOWN, DataStatusEnum.ERROR);
}
}
}
對於每個DataRequest
對象,都有一個DataResponse
對象。現在有人通過傳遞DataRequest
對象調用我們的庫,在內部我們製作List<DataRequest>
對象,然後我們並行調用每個對象DataRequest
,並返回List<DataResponse>
,其中列表中的每個DataResponse
對象都會響應對應的對象DataRequest
對象。
下面是流量:
- 客戶將通過傳遞
DataRequest
對象調用DataClient
類。他們可以根據他們的要求調用executeSync()
或executeAsync()
方法。 - 現在在
DataFetcherTask
類(這是ForkJoinTask's
亞型一個RecursiveTask
一個),給定key
對象,它是一個單一的DataRequest
,我會產生List<DataRequest>
,然後調用每個子任務並行爲列表中的每個對象DataRequest
。這些子任務與父任務在相同的ForkJoinPool
中執行。 - 現在在
DataRequestTask
類中,我通過創建URL並將其DataResponse
對象返回來執行每個DataRequest
對象。
問題陳述:
因爲這個庫被稱爲一個非常高的吞吐量的環境,因此必須非常快。對於同步調用,在單獨的線程中執行可以嗎?這將導致線程的額外成本和資源以及線程的上下文切換成本,所以我有點混淆。此外,我在這裏使用ForkJoinPool
這將節省我使用額外的線程池,但它是在這裏正確的選擇?
有沒有更好的和有效的方式來做同樣的事情,也可以提高性能?我使用的是Java 7,並且可以訪問Guava庫,所以如果它可以簡化任何事情,那麼我也可以開放它。
看起來我們在負載很重時會看到一些爭用。有沒有什麼辦法可以使這個代碼在非常重的負載下運行時進入線程爭用?
聽起來像[ThreadPool](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html)會很有用,但請記住,過早優化是所有邪惡的來源 –
@ScaryWombat同意,這就是爲什麼我會做負載測試,但問題是我使用ForkJoinPool這也是ThreadPool的專用形式是合理的。然後,我使用executeSync方法的方式是否正確? – john
你看到了什麼樣的爭用?也許是重負載新的ForkJoinPool(16);'是不夠的,嘗試增加'16'到一個更大的值 – Teg