2011-07-01 25 views
2

我有一個包含工作項的隊列,我希望多個線程在這些項目上並行工作。 處理工作項目時,可能會產生新的工作項目。我遇到的問題是,我無法找到解決辦法,如何確定我是否完成。工人看起來是這樣的:如何處理可能會創建新工作項的多個工作線程

public class Worker implements Runnable { 
    public void run() { 
    while (true) { 
     WorkItem item = queue.nextItem(); 
     if (item != null) { 
     processItem(item); 
     } 
     else { 
     // the queue is empty, but there may still be other workers 
     // processing items which may result in new work items 
     // how to determine if the work is completely done? 
     } 
    } 
    } 
} 

這似乎是一個非常簡單的問題,但實際上我很茫然。什麼是最好的實施方式?

感謝

澄清: 工作線程必須終止,一旦他們沒有被處理的項目,但只要其中至少有一個是仍在工作,他們必須等待,因爲這可能會導致新的工作項目。

+1

通常您會等待某種同步對象,並在每次將新項目添加到隊列時收到通知。 – sje397

回答

0

我建議等待/通知電話。在其他情況下,工作線程會等待對象,直到隊列通知需要做更多工作。當工作人員創建新項目時,它會將其添加到隊列中,並且隊列調用會通知工作人員正在等待的對象。其中一人將醒來消耗新的物品。

類Object的wait,notify和notifyAll方法支持從一個線程到另一個線程的有效控制轉移。線程可以使用等待來暫停自己,而不是簡單地「旋轉」(反覆地鎖定和解鎖對象以查看某個內部狀態是否發生了變化),直到另一個線程使用通知喚醒爲止。這對於線程具有生產者 - 消費者關係(主動協作以實現共同目標)而不是互斥關係(試圖避免共享公共資源時發生衝突)的情況尤爲適用。

Source: Threads and Locks

+0

嗨。感謝您的回答。但是當所有的工人都在等待時,就不會有更多的工作,所以在這種情況下,我需要退出循環。而正是這是我的問題,即我需要線程終止,一旦他們都沒有工作。 – Stefan

+0

有趣。隊列需要知道所有工人的狀態。我會用while(!queue.workComplete())替換while(true)行。每當請求新項目並且當前沒有新工作時,請讓隊列檢查工作人員的狀態。如果他們都報告他們已經完成了他們的任務,請爲workComplete()返回true,然後在工人等待的對象上調用NotifyAll。他們都醒了,發現沒有更多的工作要來了,終止。可能需要一些同步技巧來解決,但我相當確定它可以工作。 – Coeffect

+0

謝謝,這聽起來可行,我會考慮一下。 – Stefan

0

我想看看什麼水平高於等待/通知。要正確並避免死鎖是非常困難的。你看過java.util.concurrent.CompletionService<V>?您可以有一個簡單的經理線程來輪詢服務和結果,這些結果可能包含或不包含新的工作項目。

0

使用BlockingQueue包含的項目與同步組跟蹤的所有元素來處理沿着當前正在處理:

BlockingQueue<WorkItem> bQueue; 
Set<WorkItem> beingProcessed = new Collections.synchronizedSet(new HashMap<WorkItem>()); 
bQueue.put(workItem); 
... 

// the following runs over many threads in parallel 
while (!(bQueue.isEmpty() && beingProcessed.isEmpty())) { 
    WorkItem currentItem = bQueue.poll(50L, TimeUnit.MILLISECONDS); // null for empty queue 
    if (currentItem != null) { 
    beingProcessed.add(currentItem); 
    processItem(currentItem); // possibly bQueue.add(newItem) is called from processItem 
    beingProcessed.remove(currentItem); 
    } 
} 

編輯:作爲@Hovercraft完整的鰻魚建議,一個ExecutorService可能是你應該用什麼。隨時隨地添加新任務。您可以半忙等待以executorService.awaitTermination(time, timeUnits)定期終止所有任務,並在此之後終止所有線程。

+0

謝謝,這非常有幫助。不幸的是,由於代表低,我無法贊成。 – Stefan

0

下面是排隊的開始,以解決您的問題。基本上,您需要在過程工作中跟蹤新工作

public class WorkQueue<T> { 
    private final List<T> _newWork = new LinkedList<T>(); 
    private int _inProcessWork; 

    public synchronized void addWork(T work) { 
     _newWork.add(work); 
     notifyAll(); 
    } 

    public synchronized T startWork() throws InterruptedException { 
     while(_newWork.isEmpty() && (_inProcessWork > 0)) { 
      wait(); 
      if(!_newWork.isEmpty()) { 
       _inProcessWork++; 
       return _newWork.remove(0); 
      } 
     } 
     // everything is done 
     return null; 
    } 

    public synchronized void finishWork() { 
     _inProcessWork--; 
     if((_inProcessWork == 0) && _newWork.isEmpty()) { 
      notifyAll(); 
     } 
    } 
} 

你的工人將大致如下:

public class Worker { 
    private final WorkQueue<T> _queue; 

    public void run() { 
    T work = null; 
    while((work = _queue.startWork()) != null) { 
     try { 
     // do work here... 
     } finally { 
     _queue.finishWork(); 
     } 
    } 
    } 
} 

的一個技巧是,你需要添加的第一個工作項_before啓動任何人員(否則他們將全部立即退出)。

+0

非常感謝您的努力,這看起來相當不錯。不幸的是我還不能滿足。 – Stefan