2015-05-18 15 views
0

,我遇到了一個很奇怪的問題: 我從一個大文件讀取和使用多個工作線程輸入工作(一些複雜的字符串匹配的任務) 。我使用LinkedBlockingQueue將數據傳輸到工作線程,並在worker類中使用易失性布爾標誌來在到達文件結尾時響應信號。Java線程不迴應我是新來的Java併發揮發性布爾標誌

但是,我不能讓工作線程正確停止。該程序的CPU使用率最終幾乎爲零,但程序不會正常終止。

簡化代碼如下。我已經刪除了真正的代碼,並用一個簡單的字計數器替換它們。但效果是一樣的。整個文件處理後,工作線程不會終止,並且主線程中的布爾標誌被設置爲true。

main

public class MultiThreadTestEntry 
{ 
    private static String inputFileLocation = "someFile"; 
    private static int numbOfThread = 3; 

    public static void main(String[] args) 
    { 
     int i = 0; 
     Worker[] workers = new Worker[numbOfThread]; 
     Scanner input = GetIO.getTextInput(inputFileLocation); 
     String temp = null; 
     ExecutorService es = Executors.newFixedThreadPool(numbOfThread); 
     LinkedBlockingQueue<String> dataQueue = new LinkedBlockingQueue<String>(1024); 

     for(i = 0 ; i < numbOfThread ; i ++) 
     { 
      workers[i] = new Worker(dataQueue); 
      workers[i].setIsDone(false); 
      es.execute(workers[i]); 
     } 

     try 
     { 
      while(input.hasNext()) 
      { 
       temp = input.nextLine().trim(); 
       dataQueue.put(temp); 
      } 
     } 
     catch (InterruptedException e) 
     { 
      Thread.currentThread().interrupt(); 
     } 

     input.close(); 

     for(i = 0 ; i < numbOfThread ; i ++) 
     { 
      workers[i].setIsDone(true); 
     } 

     es.shutdown(); 

     try 
     { 
      es.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
     } catch (InterruptedException e) 
     { 
      Thread.currentThread().interrupt(); 
     } 
    } 
} 

worker

public class Worker implements Runnable 
{ 
    private LinkedBlockingQueue<String> dataQueue = null; 
    private volatile boolean isDone = false; 

    public Worker(LinkedBlockingQueue<String> dataQueue) 
    { 
     this.dataQueue = dataQueue; 
    } 

    @Override 
    public void run() 
    { 
     String temp = null; 
     long count = 0; 
     System.out.println(Thread.currentThread().getName() + " running..."); 

     try 
     { 
      while(!isDone || !dataQueue.isEmpty()) 
      { 
       temp = dataQueue.take(); 
       count = temp.length() + count; 

       if(count%1000 == 0) 
       { 
        System.out.println(Thread.currentThread().getName() + " : " + count); 
       } 
      } 

      System.out.println("Final result: " + Thread.currentThread().getName() + " : " + count); 
     } 
     catch (InterruptedException e) 
     { 

     } 
    } 

    public void setIsDone(boolean isDone) 
    { 
     this.isDone = isDone; 
    } 
} 

,爲什麼出現這種情況有什麼建議類?

非常感謝。

+0

這是一個瘋狂的建議,只是因爲我一直在尋找API剛纔。試試一個AtomicBoolean來看看會發生什麼?顯然他們是併發準備好的 - 我沒有發佈這個答案,因爲我真的不知道結果會是什麼。 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/atomic/AtomicBoolean.html – AK47

+4

這是因爲採取'()'上的空隊列等待,直到一個元件是可用的。查看[這個問題](http://stackoverflow.com/q/812342/3004881)的答案,找到在仍然使用'take()'的同時完成你想要做的事情的方法。 –

回答

0

正如丹·蓋茨已經說過你的工人take()等待,直到元素變得可用,但隊列可能是空的。

在你的代碼檢查隊列爲空,但沒有什麼能夠阻止其他工人讀取和檢查後的元素刪除元素。

如果隊列僅包含一個元素和t1t2兩個線程 以下可能發生:

 
t2.isEmpty(); // -> false 
t1.isEmpty(); // -> false 
t2.take(); // now the queue is empty 
t1.take(); // wait forever 

在這種情況下t1將等待「永遠」。

您可以通過使用poll代替take避免這種情況,檢查結果是null

public void run() 
{ 
    String temp = null; 
    long count = 0; 
    System.out.println(Thread.currentThread().getName() + " running..."); 

    try 
    { 
     while(!isDone || !dataQueue.isEmpty()) 
     { 
      temp = dataQueue.poll(2, TimeUnit.SECONDS); 
      if (temp == null) 
       // re-check if this was really the last element 
       continue; 

      count = temp.length() + count; 

      if(count%1000 == 0) 
      { 
       System.out.println(Thread.currentThread().getName() + " : " + count); 
      } 
     } 

     System.out.println("Final result: " + Thread.currentThread().getName() + " : " + count); 
    } 
    catch (InterruptedException e) 
    { 
     // here it is important to restore the interrupted flag! 
     Thread.currentThread().interrupt(); 
    } 
}