2017-01-06 71 views
0

假設我有一個很大的隊列,就像10,000個對象一樣。我想用5個工作線程創建一個線程池,每個工作線程從隊列中移除一個項並對其進行處理,直到隊列爲空。隊列中的多線程作業 - 作業太多?

我擔心的是,通過使用我在不同地方看到的設置,我最終創建了10,000個工作,但是通過5名工作人員執行。我覺得這不是真正可擴展的 - 隊列已經有10,000個項目,現在我在堆棧上還有10,000個作業(即使它們沒有被正在執行,這看起來像是一個內存問題)。

這似乎是這個答案的建議:https://stackoverflow.com/a/9916299/774359 - 這是「// now submit our jobs」部分,讓我很擔心。是否有效地將隊列倒入工作中是一個問題?

這裏是什麼,我至今一個簡單的例子:)

在main(:

ExecutorService executor = Executors.newFixedThreadPool(5); 
while(!hugeQueue.isEmpty()) { 
    String work = hugeQueue.remove(); 
    System.out.println("Creating job for " + work); 
    Runnable worker = new Worker(work); 
    executor.execute(worker); 
} 

在Worker類:​​

public Worker(String itemFromQueue) { this.job = itemFromQueue; } 

@Override 
public void run() { 
    System.out.println("Working on " + this.itemFromQueue); 
    //Do actual work 
} 

hugeQueue包含萬個的數字,我查看所有10,000個「創建作業」消息,然後查看所有10,000個「正在處理」消息。我認爲如果只有5個職位同時創建,然後他們就可以開展工作 - 當一個線程打開時,它會創建另一個工作,然後工作。這樣,堆棧中就不會有10,000個作業。我將如何實現這一目標?我是否正確地思考這個架構?


編輯,包括基於一個答案更新的信息:

@ seneque的代碼沒有編譯通俗易懂,所以我做了一些細微的變化 - 不幸的是,這個輸出是工人剛剛創建,而沒有實際的工作。

在main():

int numOfThreads = 5; 
BlockingQueue<Integer> hugeQueue = new LinkedBlockingQueue<>(); 
for(int x = 0; x < 1000; x++) { hugeQueue.add(x); } 

ExecutorService executor = Executors.newFixedThreadPool(numOfThreads); 
LongRunningWorker longRunningWorker = new LongRunningWorker(); 

for(int i = 0; i < numOfThreads ; i++) { 
    System.out.println("Created worker #" + i); 
    executor.submit(longRunningWorker); 
} 
System.out.println("Done"); 

在LongRunningWorker:

public class LongRunningWorker implements Runnable { 
    BlockingQueue<Integer> workQueue; 
    void spiderExmaple(BlockingQueue<Integer> workQueue) { 
     this.workQueue = workQueue; 
    } 

    @Override 
    public void run() { 
     try { 
      while(workQueue.poll(3, TimeUnit.SECONDS) != null) { 
       Integer work = workQueue.remove(); 
       System.out.println("Working on " + work); 
       new Worker(work).run(); 
      } 
     } catch (InterruptedException e) { e.printStackTrace(); } 
    } 
} 

在工人:

public class Worker implements Runnable{ 
    Integer work; 
    Worker(Integer x) { this.work = x; } 

    @Override 
    public void run() { 
     System.out.println("Finished work on " + this.work); 

    } 
} 
+0

甲ThreadPoolExecutorService(其是在創建什麼當調用Executors.newFixedThreadPool(5))具有內部隊列。所以在這裏,你從一個隊列中取出另一個隊列,這個隊列將被5個線程讀取 – seneque

+0

@seneque Right - 這意味着,對於我自己的10,000個項目隊列,我將有效地創建第二個隊列相同的大小,正確?對象是不同的,但我的問題是這是否是一個可行的解決方案給予雙重內存要求 – Jake

+0

而不是,如果hugeQueue是一個阻塞隊列,你可以讓你的5線程引用你的隊列並從隊列中輪詢。 – seneque

回答

1

一種解決方案將是有你五個直接輪詢隊列。

BlockingQueue<String> hugeQueue = ... 
ExecutorService executor = Executors.newFixedThreadPool(5); 
LongRunningWorker longRunningWorker = new LongRunningWorker(hugeQueue); 
for(int i = 0; i < 5 ; i++) { 
    executor.submit(longRunningWorker) 
} 

然後LongRunningWorker的定義如下:

class LongRunningWorker(BlockingQueue<String> workQueue) extends Runnable { 
    final BlockingQueue<String> workQueue; 
    LongRunningWorker(BlockingQueue<String> workQueue) { 
     this.workQueue = workQueue; 
    } 

    public void run() { 
     while((String work = workQueue.poll(3, TimeUnit.Second) != null) { 
      try { 
       new Worker(work).run(); 
      } catch (Exception e) { 
       // 
      } 
     } 
    } 
} 
+0

完美無缺,正是我一直在尋找的。謝謝! – Jake

+0

它看起來像代碼不工作,檢查出來後 - 任何建議?它只是表明它創造了工人的事實,但工作本身從未開始。 – Jake

+0

新工人(工作).run(); < - 所以它應該做的工作 – seneque