2013-12-07 43 views
0

我想實現在我的代碼中使用一個隊列。關鍵是我希望它能夠打印出文件中的字數總量,這意味着我需要它在完成時將所有結果添加到一起。使用BlockingQueue的

當前,我的程序所做的是,我有一個讀取器運行文件,並返回一個包含文件名稱的字符串以及其中的單詞量。然後我使用我的主要方法來運行args數組中給出的每個參數的for循環。每次我們通過一個新文件來檢查有多少單詞時,我們就會將它變成一個新線索。

public static void main(final String[] args) { 
    Thread t = null; 
    if (args.length >= 1) { 
     String destinationFileName = args[(args.length-1)]; 
      for (int l = 0; l < (args.length); l++) { 
       final int q = l; 
       final Thread y = t; 
       Runnable r = new Runnable() { 
        public void run() { 
         String res = readTextFile(args[q]); 
         System.out.println(res); 
        } 
       }; 
       t = new Thread(r); 
       t.start(); 
      } 
    } else { 
     System.err.println("Not enough input files"); 
    } 
} 

所以,我怎麼做一個隊列,不知怎的,使它們互相等待,因此,它不會使增加了結果的確切同一時間的錯誤呢?

回答

0

阻塞隊列在這裏似乎沒有必要。只要有每個線程的結果添加到一個線程安全的列表,可以構建這樣的:

final List<String> results = 
     Collections.synchronizedList(new ArrayList<String>()); 

接下來,你要等到所有線程都聚集結果之前完成。您可以通過在每個線程上調用join來完成此操作。每個線程加入一個名爲threads列表中,那麼一旦所有的線程已經啓動,調用此:

for(Thread t : threads) { 
    t.join(); 
} 

此代碼將有效地等待每一個線程在移動之前完成。

0

這是一個很常見的例子當多個線程所必需的一些處理,比如從磁盤讀取文件IO操作,我想這是家教的目的,對於現實生活中的例子,考慮看地圖降低Hadoop等

框架看到類似的任務是如何使用Hadoop here

但是,隨着僞例子做:

import java.util.HashMap; 
import java.util.Map; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 

class ConsumerProducer implements Runnable { 
    private final BlockingQueue<String> map; 
    private final BlockingQueue<Map<String, Integer>> reduce; 

    ConsumerProducer(BlockingQueue<String> map, 
      BlockingQueue<Map<String, Integer>> reduce) { 
     this.map = map; 
     this.reduce = reduce; 
    } 

    public void run() { 
     try { 
      while (true) { 
       Map<String, Integer> wordToOccurrences = this.consume(map 
         .take()); 
       this.produce(wordToOccurrences); 
      } 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    private void produce(Map<String, Integer> wordToOccurrences) 
      throws InterruptedException { 
     reduce.put(wordToOccurrences); 
    } 

    public Map<String, Integer> consume(String fileName) { 
     // read the file and return 'word' -> number of occurrences 
     return new HashMap<String, Integer>(); 
    } 
} 

class Setup { 

    public static void main(String[] args) throws InterruptedException { 
     BlockingQueue<String> map = new LinkedBlockingQueue<String>(); 
     BlockingQueue<Map<String, Integer>> reduce = new LinkedBlockingQueue<Map<String, Integer>>(); 

     for (String fileName : args) { 
      map.put(fileName); 
      // assuming every thread process single file, for other options see 
      // http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html 
      ConsumerProducer c = new ConsumerProducer(map, reduce); 
      new Thread(c).start(); 
     } 

     for (int i = 0; i < args.length; i++) { 
      Map<String, Integer> wordToOccurrences = reduce.take(); 
      // start consuming results 
     } 

     // print merged map of total results 
    } 
}