有沒有辦法通過一個龐大的數據庫併爲條目平臺應用一些作業? 我試着用ExecutorService的,但我們必須爲了知道池大小關閉()...Java - ExecutorService具有最大大小
所以我的最好的解決辦法是:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestCode
{
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest)
{
return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29");
}
public static void main(String args[]) throws Exception
{
int dbOffset = 0;
int nbOfArticlesPerRequest = 100;
int MYTHREADS = 10;
int loopIndex = 0;
boolean bContinue=true;
Runnable worker;
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN...
List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
}
public static class MyRunnable implements Runnable {
private final String id;
MyRunnable(String id) {
this.id = id;
}
@Override
public void run()
{
System.out.println("Thread '"+id+"' started");
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread '"+id+"' stopped");
}
}
}
這是工作正常,但美中不足的是,在循環的每一端,我都需要等待最後一個線程完成。
例如爲:當只有3個線程正在運行...
我做了如下爲了解決這個問題,但就是「安全」 /是否正確?
順便說一句:有什麼方法可以知道隊列中有多少個任務/線程?
int dbOffset = 0;
int nbOfArticlesPerRequest = 5; //100;
int MYTHREADS = 2;
int loopIndex = 0;
ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE**
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executor.execute(worker);
}
while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
編輯:
我嘗試推出1級或10數以百萬計的任務,所以(我認爲),我不能把他們都在排隊......這就是爲什麼我使用一個全球性的執行爲了能夠在隊列中總是有一些線程(因爲我不能關閉執行程序,否則它不再可用)。
最新代碼版本:
int dbOffset = 0;
int nbOfArticlesPerRequest = 5; //100;
int MYTHREADS = 2;
int loopIndex = 0;
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE**
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executorPool.execute(worker);
}
while (executorPool.getActiveCount() >= MYTHREADS || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2))
{
System.out.println("Pool size is now " + executorPool.getActiveCount()+
" - queue size: "+ executorPool.getQueue().size()
);
if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) {
System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue");
break;
}
TimeUnit.MILLISECONDS.sleep(2000);
}
if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
executorPool.shutdown();
// Wait until all threads are finish
while (!executorPool.isTerminated()) {
System.out.println("Pool size is now " + executorPool.getActiveCount()+
" - queue size: "+ executorPool.getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
在此先感謝
可以使用的invokeAll()來等待線程的完成。參考:https://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish/36699136#36699136 –