2015-11-22 120 views
3

我有一個Java多線程文件搜尋器,我正在做一個問題。我的問題是,我有一個workQueue,它是一個linkedBlockingQueue,它包含我想要用我的線程爬過的文件的名稱,每個線程將從workQueue中take(),並在掃描整個文件的同時可能會將put()另一個文件名放入workQueue(這是一個依賴檢查程序)。所以我從來沒有真正確定什麼時候工作全部完成,並且所有線程最終都會進入等待狀態,當他們嘗試從(最終)空workQueue中嘗試take()時。初學者Java多線程問題

所以我想我的問題是,是否有一種有效的方法來終止所有的線程(當所有的線程都進入等待狀態時)?目前我只在主線程上使用sleep(),然後在所有的工作線程中使用interrupt()

對不起,如果這個問題聽起來很混亂。

回答

0

您可以使用下面的方法。如果需要,添加觀察者模式。 或者簡單地說 - 不是用死信息包發送信號,而是收集等待線程列表,然後中斷()它們。

public class AccessCountingLinkedPrioQueue<T> { 

    private final LinkedBlockingQueue<T> mWrappingQueue     = new LinkedBlockingQueue<>(); 
    private final Object     mSyncLockObj     = new Object(); 

    private final int      mMaxBlockingThreads; 
    private final T       mDeathSignallingObject; 

    private volatile int     mNumberOfThreadsInAccessLoop = 0; 

    public AccessCountingLinkedPrioQueue(final int pMaxBlockingThreads, final T pDeathSignallingObject) { 
     mMaxBlockingThreads = pMaxBlockingThreads; 
     mDeathSignallingObject = pDeathSignallingObject; 
    } 

    public T take() throws InterruptedException { 
     final T retVal; 
     synchronized (mSyncLockObj) { 
      ++mNumberOfThreadsInAccessLoop; 
     } 
     synchronized (mWrappingQueue) { 
      if (mNumberOfThreadsInAccessLoop >= mMaxBlockingThreads && mWrappingQueue.isEmpty()) signalDeath(); 
      retVal = mWrappingQueue.take(); 
     } 
     synchronized (mSyncLockObj) { 
      --mNumberOfThreadsInAccessLoop; 
     } 
     return retVal; 
    } 

    private void signalDeath() { 
     for (int i = 0; i < mMaxBlockingThreads; i++) { 
      mWrappingQueue.add(mDeathSignallingObject); 
     } 
    } 

    public int getNumberOfThreadsInAccessLoop() { 
     return mNumberOfThreadsInAccessLoop; 
    } 
} 

class WorkPacket { 
    // ... your content here 
} 

class MultiThreadingBoss { 
    static public final WorkPacket DEATH_FROM_ABOVE = new WorkPacket(); 

    public MultiThreadingBoss() { 
     final int THREADS = 7; 
     final AccessCountingLinkedPrioQueue<WorkPacket> prioQ = new AccessCountingLinkedPrioQueue<>(THREADS, DEATH_FROM_ABOVE); 
     for (int i = 0; i < THREADS; i++) { 
      final ThreadedWorker w = new ThreadedWorker(prioQ); 
      new Thread(w).start(); 
     } 
    } 
} 

class ThreadedWorker implements Runnable { 
    private final AccessCountingLinkedPrioQueue<WorkPacket> mPrioQ; 

    public ThreadedWorker(final AccessCountingLinkedPrioQueue<WorkPacket> pPrioQ) { 
     mPrioQ = pPrioQ; 
    } 

    @Override public void run() { 
     while (true) { 
      try { 
       final WorkPacket p = mPrioQ.take(); 
       if (p == MultiThreadingBoss.DEATH_FROM_ABOVE) break; // or return 

       // ... do your normal work here 

      } catch (final InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 
+0

真誠的thanx爲downvotes人。但是在上帝的名下,請留下評論爲什麼這是不好的,所以我們都學習。 – JayC667

+1

不確定downvotes,但我會用ExecutorService管理線程。 –

1

我以前有這個問題,我發現的唯一辦法就是到一個特殊的標記對象發送到BlockingQueue。當隊列.take()的對象,如果這是標記,那麼Thread自己結束。

我試過其他解決方案,比如喚醒線程並檢測異常,但沒有成功。

1

有一種叫做Poison Pill的模式,對此很有幫助。基本上,當生產者完成後,在隊列中插入一個特殊值,告訴消費者停止。您可以爲每個消費者插入一個藥丸,或者,一旦消費者獲得毒藥丸,將其返回到下一個消費者的隊列。因爲它聽起來就像你剛剛入隊的字符串,像

public static final String POISON_PILL = "DONE"; 

或者在Java中8,使用Optional來包裝你的價值觀,那麼已經不存在是避孕藥。

BlockingQueue<Optional<...>> queue; 

另一種選擇是使用ExecutorService(這實際上是由BlockingQueue支持),並利用每個文件作爲自己的任務,然後使用executorService.shutdown()時,即可大功告成。與此相關的問題是,它會比你需要的代碼更緊密地耦合你的代碼,並且使得重用像數據庫和HTTP連接之類的資源變得更加困難。

我會避免中斷您的工作人員發出信號,因爲這可能會導致阻止IO操作失敗。