我是Java中多線程的新手。Java如何實現並行性,動態分配工作?
我的目標是讓一個線程讀取文件,然後將工作塊傳遞給工作線程並行處理。
這裏有一個非常好的例子。 Concurrency Tutorial
此代碼片段獲取工作列表(ArrayList<String> URLs
)並將其轉儲到具有Task.call()方法中指定的函數的工作線程塊上。
void pingAndReportEachWhenKnown() throws InterruptedException, ExecutionException {
int numThreads = URLs.size() > 4 ? 4 : URLs.size(); //max 4 threads
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
CompletionService<PingResult> compService = new ExecutorCompletionService<>(executor);
for(String url : URLs){
Task task = new Task(url);
compService.submit(task);
}
for(String url : URLs){
Future<PingResult> future = compService.take();
log(future.get());
}
executor.shutdown(); //always reclaim resources
}
這正是我想要做的,但我需要改變。 我的工作隊列的大小不適合工作內存(巨大文件),所以我需要緩衝讀取線。我可以用ArrayBlockingQueue實現Blocking我需要的功能。但是,我還需要將任務分配緩衝到CompletionService。工作塊大小會有所不同,因此完成時間也會有所不同。
我怎麼不把太多的compService工作隊列?下面的代碼會一次放置一個項目,因爲它會在嘗試從隊列中獲取另一個任務之前等待完成。所以這是不夠的。處理這個問題的正確或最好的方法是什麼?
for(;;){
Task task = arrayBlockingQueue.take(); //Blocking operation
compService.submit(task);
Future<PingResult> future = compService.take(); //Blocking operation
log(future.get());
}
這不幸的是不會工作。一旦隊列已滿並且所有線程忙於拋出「RejectedExecutionException」。 –
@JohnVint,看起來像是一種無需自己構建就能節流的方法。看到我更新的答案。 –
我喜歡它,即使隊列飽和,你仍然可以取得進步。 –