2016-04-29 52 views
0

我寫了一個簡單的消費,生產模式,以幫助我實現以下任務:閱讀大量使用製作文件會導致CPU使用率是100%

  1. 閱讀從目錄中的文件,該文件包含五十萬元TSV(製表符分隔)文件。
  2. 將每個文件操作爲一個數據結構並將其放入阻塞隊列中。
  3. 使用消費者和查詢數據庫消費隊列。
  4. 比較兩個哈希映射,如果有差異,則將差異打印到文件。

當我運行該程序時,即使使用5個線程,我的CPU消耗突然增加到100%。這可能是因爲我使用單個生產者來讀取文件?

文件的例子(製表符分隔)

Column1 Column2 Column3 Column 4 Column5 
A   1   *   -   - 
B   1   *   -   - 
C   1   %   -   - 

生產者

public class Producer implements Runnable{ 
private BlockingQueue<Map<String, Map<String, String>>> m_Queue; 
private String m_Directory; 

public Producer(BlockingQueue<Map<String, Map<String, String>>> i_Queue, String i_Directory) 
{ 
    m_Queue = i_Queue; 
    m_Directory = i_Directory; 
} 

@Override 
public void run() 
{ 
    if (Files.exists(Paths.get(m_Directory))) 
    { 
     File[] files = new File(m_Directory).listFiles(); 

     if (files != null) 
     { 
      for (File file : files) 
      { 
       Map<String, String> map = new HashMap<>(); 
       try (BufferedReader reader = new BufferedReader(new FileReader(file))) 
       { 
        String line, lastcolumn3 = "", column1 = "", column2 = "", column3 = ""; 
        while ((line = reader.readLine()) != null) 
        { 
         //Skip column header 
         if (!Character.isLetter(line.charAt(0))) 
         { 
          String[] splitLine = line.split("\t"); 

          column1 = splitLine[0].replace("\"", ""); 
          column2 = splitLine[1].replace("\"", ""); 
          column3 = splitLine[2].replace("\"", ""); 

          if (!lastcolumn3.equals(column3)) 
          { 
           map.put(column3, column1); 
           lastcolumn3 = column3; 
          } 
         } 
        } 

        map.put(column3, column1); 

        //Column 1 is always the same per file, it'll be the key. Column2 and Column3 are stored as the value (as a key-value pair) 
        Map<String, Map<String, String>> mapPerFile = new HashMap<>(); 
        mapPerFile.put(column2, map); 

        m_Queue.put(mapPerFile); 
       } 
       catch (IOException | InterruptedException e) 
       { 
        System.out.println(file); 
        e.printStackTrace(); 
       } 
      } 
     } 
    } 
}} 

消費者

public class Consumer implements Runnable{ 
private HashMap<String, String> m_DBResults; 
private BlockingQueue<Map<String, Map<String, String>>> m_Queue; 
private Map<String, Map<String, String>> m_DBResultsPerFile; 
private String m_Column1; 
private int m_ThreadID; 

public Consumer(BlockingQueue<Map<String, Map<String, String>>> i_Queue, int i_ThreadID) 
{ 
    m_Queue = i_Queue; 
    m_ThreadID = i_ThreadID; 
} 

@Override 
public void run() 
{ 
    try 
    { 
     while ((m_DBResultsPerFile = m_Queue.poll()) != null) 
     { 
      //Column1 is always the same, only need the first entry. 
      m_Column1 = m_DBResultsPerFile.keySet().toArray()[0].toString(); 

      //Queries DB and puts returned data into m_DBResults 
      queryDB(m_Column1); 

      //Write the difference, if any, per thread into a file. 
      writeDifference(); 
     } 
    } 
    catch (Exception e) 
    { 
     e.printStackTrace(); 
    } 
} 

private void writeDifference() 
{ 
    MapDifference<String, String> difference = Maps.difference(m_DBResultsPerFile.get(m_Column1), m_DBResults); 

    if (difference.entriesOnlyOnLeft().size() > 0 || difference.entriesOnlyOnRight().size() > 0) 
    { 
     try (BufferedWriter writer = new BufferedWriter(new FileWriter(String.format("thread_%d.tsv", m_ThreadID), true))) 
     { 
      if (difference.entriesOnlyOnLeft().size() > 0) 
      { 
       writer.write(String.format("%s\t%s\t", "Missing", m_Column1)); 
       for (Map.Entry<String, String> entry : difference.entriesOnlyOnLeft().entrySet()) 
       { 
        writer.write(String.format("[%s,%s]; ", entry.getKey(), entry.getValue())); 
       } 

       writer.write("\n"); 
      } 
      if (difference.entriesOnlyOnRight().size() > 0) 
      { 
       writer.write(String.format("%s\t%s\t", "Extra", m_Column1)); 
       for (Map.Entry<String, String> entry : difference.entriesOnlyOnRight().entrySet()) 
       { 
        writer.write(String.format("[%s,%s]; ", entry.getKey(), entry.getValue())); 
       } 

       writer.write("\n"); 
      } 
     } 
     catch (IOException e) 
     { 
      e.printStackTrace(); 
     } 
    } 
}} 

主要

public static void main(String[]args) { 
BlockingQueue<Map<String, Map<String,String>>> queue = new LinkedBlockingQueue <>(); 

//Start the reader thread. 
threadPool.execute(new Producer(queue, args[0])); 

//Create configurable threads. 
for (int i = 0; i < 10; i++) { 
    threadPool.execute(new Consumer(queue, i + 1)); 
} 

threadPool.shutdown(); 
System.out.println("INFO: Shutting down threads."); 

try { 
    threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
    System.out.println("INFO: Threadpool terminated successfully."); 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
}} 
+3

CPU利用率是一件壞事嗎? –

+2

你希望你的工作更慢嗎? –

+0

如果您認爲您使用的CPU比您應該多(即總運行時間過高),則可能您的算法不好。對代碼進行剖析以查看可能的問題。 – Andreas

回答

6

你的CPU使用率很可能是由於這樣的:

while ((m_DBResultsPerFile = m_Queue.poll()) != null) 

poll方法不會阻止。它立即返回。所以你每秒執行循環數百萬次。

您應該使用take(),這實際上將等待可用元素:

while ((m_DBResultsPerFile = m_Queue.take()) != null) 

documentation for BlockingQueue很好地概括了這一切,在某種程度上(在我看來)避免任何混亂。

+2

'take()'實際上不會返回null。 – Andreas

+0

我試圖用take()替換poll()並沒有看到太大的區別。我會看看我是否可以更好地調整生產者班。 – ocp1000

相關問題